test_service_account.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import mock
  18. import pytest # type: ignore
  19. from google.auth import _helpers
  20. from google.auth import crypt
  21. from google.auth import exceptions
  22. from google.auth import iam
  23. from google.auth import jwt
  24. from google.auth import transport
  25. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  26. from google.oauth2 import service_account
  27. import yatest.common as yc
  28. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
  29. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  30. PRIVATE_KEY_BYTES = fh.read()
  31. with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
  32. PUBLIC_CERT_BYTES = fh.read()
  33. with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
  34. OTHER_CERT_BYTES = fh.read()
  35. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  36. SERVICE_ACCOUNT_NON_GDU_JSON_FILE = os.path.join(
  37. DATA_DIR, "service_account_non_gdu.json"
  38. )
  39. FAKE_UNIVERSE_DOMAIN = "universe.foo"
  40. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  41. SERVICE_ACCOUNT_INFO = json.load(fh)
  42. with open(SERVICE_ACCOUNT_NON_GDU_JSON_FILE, "rb") as fh:
  43. SERVICE_ACCOUNT_INFO_NON_GDU = json.load(fh)
  44. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  45. class TestCredentials(object):
  46. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  47. TOKEN_URI = "https://example.com/oauth2/token"
  48. @classmethod
  49. def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
  50. return service_account.Credentials(
  51. SIGNER,
  52. cls.SERVICE_ACCOUNT_EMAIL,
  53. cls.TOKEN_URI,
  54. universe_domain=universe_domain,
  55. )
  56. def test_constructor_no_universe_domain(self):
  57. credentials = service_account.Credentials(
  58. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, universe_domain=None
  59. )
  60. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  61. def test_from_service_account_info(self):
  62. credentials = service_account.Credentials.from_service_account_info(
  63. SERVICE_ACCOUNT_INFO
  64. )
  65. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  66. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  67. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  68. assert credentials._universe_domain == DEFAULT_UNIVERSE_DOMAIN
  69. assert not credentials._always_use_jwt_access
  70. def test_from_service_account_info_non_gdu(self):
  71. credentials = service_account.Credentials.from_service_account_info(
  72. SERVICE_ACCOUNT_INFO_NON_GDU
  73. )
  74. assert credentials.universe_domain == FAKE_UNIVERSE_DOMAIN
  75. assert credentials._always_use_jwt_access
  76. def test_from_service_account_info_args(self):
  77. info = SERVICE_ACCOUNT_INFO.copy()
  78. scopes = ["email", "profile"]
  79. subject = "subject"
  80. additional_claims = {"meta": "data"}
  81. credentials = service_account.Credentials.from_service_account_info(
  82. info, scopes=scopes, subject=subject, additional_claims=additional_claims
  83. )
  84. assert credentials.service_account_email == info["client_email"]
  85. assert credentials.project_id == info["project_id"]
  86. assert credentials._signer.key_id == info["private_key_id"]
  87. assert credentials._token_uri == info["token_uri"]
  88. assert credentials._scopes == scopes
  89. assert credentials._subject == subject
  90. assert credentials._additional_claims == additional_claims
  91. assert not credentials._always_use_jwt_access
  92. def test_from_service_account_file(self):
  93. info = SERVICE_ACCOUNT_INFO.copy()
  94. credentials = service_account.Credentials.from_service_account_file(
  95. SERVICE_ACCOUNT_JSON_FILE
  96. )
  97. assert credentials.service_account_email == info["client_email"]
  98. assert credentials.project_id == info["project_id"]
  99. assert credentials._signer.key_id == info["private_key_id"]
  100. assert credentials._token_uri == info["token_uri"]
  101. def test_from_service_account_file_non_gdu(self):
  102. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  103. credentials = service_account.Credentials.from_service_account_file(
  104. SERVICE_ACCOUNT_NON_GDU_JSON_FILE
  105. )
  106. assert credentials.service_account_email == info["client_email"]
  107. assert credentials.project_id == info["project_id"]
  108. assert credentials._signer.key_id == info["private_key_id"]
  109. assert credentials._token_uri == info["token_uri"]
  110. assert credentials._universe_domain == FAKE_UNIVERSE_DOMAIN
  111. assert credentials._always_use_jwt_access
  112. def test_from_service_account_file_args(self):
  113. info = SERVICE_ACCOUNT_INFO.copy()
  114. scopes = ["email", "profile"]
  115. subject = "subject"
  116. additional_claims = {"meta": "data"}
  117. credentials = service_account.Credentials.from_service_account_file(
  118. SERVICE_ACCOUNT_JSON_FILE,
  119. subject=subject,
  120. scopes=scopes,
  121. additional_claims=additional_claims,
  122. )
  123. assert credentials.service_account_email == info["client_email"]
  124. assert credentials.project_id == info["project_id"]
  125. assert credentials._signer.key_id == info["private_key_id"]
  126. assert credentials._token_uri == info["token_uri"]
  127. assert credentials._scopes == scopes
  128. assert credentials._subject == subject
  129. assert credentials._additional_claims == additional_claims
  130. def test_default_state(self):
  131. credentials = self.make_credentials()
  132. assert not credentials.valid
  133. # Expiration hasn't been set yet
  134. assert not credentials.expired
  135. # Scopes haven't been specified yet
  136. assert credentials.requires_scopes
  137. def test_sign_bytes(self):
  138. credentials = self.make_credentials()
  139. to_sign = b"123"
  140. signature = credentials.sign_bytes(to_sign)
  141. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  142. def test_signer(self):
  143. credentials = self.make_credentials()
  144. assert isinstance(credentials.signer, crypt.Signer)
  145. def test_signer_email(self):
  146. credentials = self.make_credentials()
  147. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  148. def test_create_scoped(self):
  149. credentials = self.make_credentials()
  150. scopes = ["email", "profile"]
  151. credentials = credentials.with_scopes(scopes)
  152. assert credentials._scopes == scopes
  153. def test_with_claims(self):
  154. credentials = self.make_credentials()
  155. new_credentials = credentials.with_claims({"meep": "moop"})
  156. assert new_credentials._additional_claims == {"meep": "moop"}
  157. def test_with_quota_project(self):
  158. credentials = self.make_credentials()
  159. new_credentials = credentials.with_quota_project("new-project-456")
  160. assert new_credentials.quota_project_id == "new-project-456"
  161. hdrs = {}
  162. new_credentials.apply(hdrs, token="tok")
  163. assert "x-goog-user-project" in hdrs
  164. def test_with_token_uri(self):
  165. credentials = self.make_credentials()
  166. new_token_uri = "https://example2.com/oauth2/token"
  167. assert credentials._token_uri == self.TOKEN_URI
  168. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  169. assert creds_with_new_token_uri._token_uri == new_token_uri
  170. def test_with_universe_domain(self):
  171. credentials = self.make_credentials()
  172. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  173. assert new_credentials.universe_domain == "dummy_universe.com"
  174. assert new_credentials._always_use_jwt_access
  175. new_credentials = credentials.with_universe_domain("googleapis.com")
  176. assert new_credentials.universe_domain == "googleapis.com"
  177. assert not new_credentials._always_use_jwt_access
  178. def test__with_always_use_jwt_access(self):
  179. credentials = self.make_credentials()
  180. assert not credentials._always_use_jwt_access
  181. new_credentials = credentials.with_always_use_jwt_access(True)
  182. assert new_credentials._always_use_jwt_access
  183. def test__with_always_use_jwt_access_non_default_universe_domain(self):
  184. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  185. with pytest.raises(exceptions.InvalidValue) as excinfo:
  186. credentials.with_always_use_jwt_access(False)
  187. assert excinfo.match(
  188. "always_use_jwt_access should be True for non-default universe domain"
  189. )
  190. def test__make_authorization_grant_assertion(self):
  191. credentials = self.make_credentials()
  192. token = credentials._make_authorization_grant_assertion()
  193. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  194. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  195. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  196. def test__make_authorization_grant_assertion_scoped(self):
  197. credentials = self.make_credentials()
  198. scopes = ["email", "profile"]
  199. credentials = credentials.with_scopes(scopes)
  200. token = credentials._make_authorization_grant_assertion()
  201. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  202. assert payload["scope"] == "email profile"
  203. def test__make_authorization_grant_assertion_subject(self):
  204. credentials = self.make_credentials()
  205. subject = "user@example.com"
  206. credentials = credentials.with_subject(subject)
  207. token = credentials._make_authorization_grant_assertion()
  208. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  209. assert payload["sub"] == subject
  210. def test_apply_with_quota_project_id(self):
  211. credentials = service_account.Credentials(
  212. SIGNER,
  213. self.SERVICE_ACCOUNT_EMAIL,
  214. self.TOKEN_URI,
  215. quota_project_id="quota-project-123",
  216. )
  217. headers = {}
  218. credentials.apply(headers, token="token")
  219. assert headers["x-goog-user-project"] == "quota-project-123"
  220. assert "token" in headers["authorization"]
  221. def test_apply_with_no_quota_project_id(self):
  222. credentials = service_account.Credentials(
  223. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  224. )
  225. headers = {}
  226. credentials.apply(headers, token="token")
  227. assert "x-goog-user-project" not in headers
  228. assert "token" in headers["authorization"]
  229. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  230. def test__create_self_signed_jwt(self, jwt):
  231. credentials = service_account.Credentials(
  232. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  233. )
  234. audience = "https://pubsub.googleapis.com"
  235. credentials._create_self_signed_jwt(audience)
  236. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  237. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  238. def test__create_self_signed_jwt_with_user_scopes(self, jwt):
  239. credentials = service_account.Credentials(
  240. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
  241. )
  242. audience = "https://pubsub.googleapis.com"
  243. credentials._create_self_signed_jwt(audience)
  244. # JWT should not be created if there are user-defined scopes
  245. jwt.from_signing_credentials.assert_not_called()
  246. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  247. def test__create_self_signed_jwt_always_use_jwt_access_with_audience(self, jwt):
  248. credentials = service_account.Credentials(
  249. SIGNER,
  250. self.SERVICE_ACCOUNT_EMAIL,
  251. self.TOKEN_URI,
  252. default_scopes=["bar", "foo"],
  253. always_use_jwt_access=True,
  254. )
  255. audience = "https://pubsub.googleapis.com"
  256. credentials._create_self_signed_jwt(audience)
  257. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  258. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  259. def test__create_self_signed_jwt_always_use_jwt_access_with_audience_similar_jwt_is_reused(
  260. self, jwt
  261. ):
  262. credentials = service_account.Credentials(
  263. SIGNER,
  264. self.SERVICE_ACCOUNT_EMAIL,
  265. self.TOKEN_URI,
  266. default_scopes=["bar", "foo"],
  267. always_use_jwt_access=True,
  268. )
  269. audience = "https://pubsub.googleapis.com"
  270. credentials._create_self_signed_jwt(audience)
  271. credentials._jwt_credentials._audience = audience
  272. credentials._create_self_signed_jwt(audience)
  273. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  274. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  275. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes(self, jwt):
  276. credentials = service_account.Credentials(
  277. SIGNER,
  278. self.SERVICE_ACCOUNT_EMAIL,
  279. self.TOKEN_URI,
  280. scopes=["bar", "foo"],
  281. always_use_jwt_access=True,
  282. )
  283. audience = "https://pubsub.googleapis.com"
  284. credentials._create_self_signed_jwt(audience)
  285. jwt.from_signing_credentials.assert_called_once_with(
  286. credentials, None, additional_claims={"scope": "bar foo"}
  287. )
  288. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  289. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes_similar_jwt_is_reused(
  290. self, jwt
  291. ):
  292. credentials = service_account.Credentials(
  293. SIGNER,
  294. self.SERVICE_ACCOUNT_EMAIL,
  295. self.TOKEN_URI,
  296. scopes=["bar", "foo"],
  297. always_use_jwt_access=True,
  298. )
  299. audience = "https://pubsub.googleapis.com"
  300. credentials._create_self_signed_jwt(audience)
  301. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  302. credentials._create_self_signed_jwt(audience)
  303. jwt.from_signing_credentials.assert_called_once_with(
  304. credentials, None, additional_claims={"scope": "bar foo"}
  305. )
  306. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  307. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes(
  308. self, jwt
  309. ):
  310. credentials = service_account.Credentials(
  311. SIGNER,
  312. self.SERVICE_ACCOUNT_EMAIL,
  313. self.TOKEN_URI,
  314. default_scopes=["bar", "foo"],
  315. always_use_jwt_access=True,
  316. )
  317. credentials._create_self_signed_jwt(None)
  318. jwt.from_signing_credentials.assert_called_once_with(
  319. credentials, None, additional_claims={"scope": "bar foo"}
  320. )
  321. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  322. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes_similar_jwt_is_reused(
  323. self, jwt
  324. ):
  325. credentials = service_account.Credentials(
  326. SIGNER,
  327. self.SERVICE_ACCOUNT_EMAIL,
  328. self.TOKEN_URI,
  329. default_scopes=["bar", "foo"],
  330. always_use_jwt_access=True,
  331. )
  332. credentials._create_self_signed_jwt(None)
  333. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  334. credentials._create_self_signed_jwt(None)
  335. jwt.from_signing_credentials.assert_called_once_with(
  336. credentials, None, additional_claims={"scope": "bar foo"}
  337. )
  338. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  339. def test__create_self_signed_jwt_always_use_jwt_access(self, jwt):
  340. credentials = service_account.Credentials(
  341. SIGNER,
  342. self.SERVICE_ACCOUNT_EMAIL,
  343. self.TOKEN_URI,
  344. always_use_jwt_access=True,
  345. )
  346. credentials._create_self_signed_jwt(None)
  347. jwt.from_signing_credentials.assert_not_called()
  348. def test_token_usage_metrics_assertion(self):
  349. credentials = service_account.Credentials(
  350. SIGNER,
  351. self.SERVICE_ACCOUNT_EMAIL,
  352. self.TOKEN_URI,
  353. always_use_jwt_access=False,
  354. )
  355. credentials.token = "token"
  356. credentials.expiry = None
  357. headers = {}
  358. credentials.before_request(mock.Mock(), None, None, headers)
  359. assert headers["authorization"] == "Bearer token"
  360. assert headers["x-goog-api-client"] == "cred-type/sa"
  361. def test_token_usage_metrics_self_signed_jwt(self):
  362. credentials = service_account.Credentials(
  363. SIGNER,
  364. self.SERVICE_ACCOUNT_EMAIL,
  365. self.TOKEN_URI,
  366. always_use_jwt_access=True,
  367. )
  368. credentials._create_self_signed_jwt("foo.googleapis.com")
  369. credentials.token = "token"
  370. credentials.expiry = None
  371. headers = {}
  372. credentials.before_request(mock.Mock(), None, None, headers)
  373. assert headers["authorization"] == "Bearer token"
  374. assert headers["x-goog-api-client"] == "cred-type/jwt"
  375. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  376. def test_refresh_success(self, jwt_grant):
  377. credentials = self.make_credentials()
  378. token = "token"
  379. jwt_grant.return_value = (
  380. token,
  381. _helpers.utcnow() + datetime.timedelta(seconds=500),
  382. {},
  383. )
  384. request = mock.create_autospec(transport.Request, instance=True)
  385. # Refresh credentials
  386. credentials.refresh(request)
  387. # Check jwt grant call.
  388. assert jwt_grant.called
  389. called_request, token_uri, assertion = jwt_grant.call_args[0]
  390. assert called_request == request
  391. assert token_uri == credentials._token_uri
  392. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  393. # No further assertion done on the token, as there are separate tests
  394. # for checking the authorization grant assertion.
  395. # Check that the credentials have the token.
  396. assert credentials.token == token
  397. # Check that the credentials are valid (have a token and are not
  398. # expired)
  399. assert credentials.valid
  400. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  401. def test_before_request_refreshes(self, jwt_grant):
  402. credentials = self.make_credentials()
  403. token = "token"
  404. jwt_grant.return_value = (
  405. token,
  406. _helpers.utcnow() + datetime.timedelta(seconds=500),
  407. None,
  408. )
  409. request = mock.create_autospec(transport.Request, instance=True)
  410. # Credentials should start as invalid
  411. assert not credentials.valid
  412. # before_request should cause a refresh
  413. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  414. # The refresh endpoint should've been called.
  415. assert jwt_grant.called
  416. # Credentials should now be valid.
  417. assert credentials.valid
  418. @mock.patch("google.auth.jwt.Credentials._make_jwt")
  419. def test_refresh_with_jwt_credentials(self, make_jwt):
  420. credentials = self.make_credentials()
  421. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  422. request = mock.create_autospec(transport.Request, instance=True)
  423. token = "token"
  424. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  425. make_jwt.return_value = (b"token", expiry)
  426. # Credentials should start as invalid
  427. assert not credentials.valid
  428. # before_request should cause a refresh
  429. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  430. # Credentials should now be valid.
  431. assert credentials.valid
  432. # Assert make_jwt was called
  433. assert make_jwt.call_count == 1
  434. assert credentials.token == token
  435. assert credentials.expiry == expiry
  436. def test_refresh_with_jwt_credentials_token_type_check(self):
  437. credentials = self.make_credentials()
  438. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  439. credentials.refresh(mock.Mock())
  440. # Credentials token should be a JWT string.
  441. assert isinstance(credentials.token, str)
  442. payload = jwt.decode(credentials.token, verify=False)
  443. assert payload["aud"] == "https://pubsub.googleapis.com"
  444. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  445. @mock.patch("google.auth.jwt.Credentials.refresh", autospec=True)
  446. def test_refresh_jwt_not_used_for_domain_wide_delegation(
  447. self, self_signed_jwt_refresh, jwt_grant
  448. ):
  449. # Create a domain wide delegation credentials by setting the subject.
  450. credentials = service_account.Credentials(
  451. SIGNER,
  452. self.SERVICE_ACCOUNT_EMAIL,
  453. self.TOKEN_URI,
  454. always_use_jwt_access=True,
  455. subject="subject",
  456. )
  457. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  458. jwt_grant.return_value = (
  459. "token",
  460. _helpers.utcnow() + datetime.timedelta(seconds=500),
  461. {},
  462. )
  463. request = mock.create_autospec(transport.Request, instance=True)
  464. # Refresh credentials
  465. credentials.refresh(request)
  466. # Make sure we are using jwt_grant and not self signed JWT refresh
  467. # method to obtain the token.
  468. assert jwt_grant.called
  469. assert not self_signed_jwt_refresh.called
  470. def test_refresh_missing_jwt_credentials(self):
  471. credentials = self.make_credentials()
  472. credentials = credentials.with_scopes(["foo", "bar"])
  473. credentials = credentials.with_always_use_jwt_access(True)
  474. assert not credentials._jwt_credentials
  475. credentials.refresh(mock.Mock())
  476. # jwt credentials should have been automatically created with scopes
  477. assert credentials._jwt_credentials is not None
  478. def test_refresh_non_gdu_domain_wide_delegation_not_supported(self):
  479. credentials = self.make_credentials(universe_domain="foo")
  480. credentials._subject = "bar@example.com"
  481. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  482. with pytest.raises(exceptions.RefreshError) as excinfo:
  483. credentials.refresh(None)
  484. assert excinfo.match("domain wide delegation is not supported")
  485. class TestIDTokenCredentials(object):
  486. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  487. TOKEN_URI = "https://example.com/oauth2/token"
  488. TARGET_AUDIENCE = "https://example.com"
  489. @classmethod
  490. def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
  491. return service_account.IDTokenCredentials(
  492. SIGNER,
  493. cls.SERVICE_ACCOUNT_EMAIL,
  494. cls.TOKEN_URI,
  495. cls.TARGET_AUDIENCE,
  496. universe_domain=universe_domain,
  497. )
  498. def test_constructor_no_universe_domain(self):
  499. credentials = service_account.IDTokenCredentials(
  500. SIGNER,
  501. self.SERVICE_ACCOUNT_EMAIL,
  502. self.TOKEN_URI,
  503. self.TARGET_AUDIENCE,
  504. universe_domain=None,
  505. )
  506. assert credentials._universe_domain == DEFAULT_UNIVERSE_DOMAIN
  507. def test_from_service_account_info(self):
  508. credentials = service_account.IDTokenCredentials.from_service_account_info(
  509. SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
  510. )
  511. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  512. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  513. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  514. assert credentials._target_audience == self.TARGET_AUDIENCE
  515. assert not credentials._use_iam_endpoint
  516. def test_from_service_account_info_non_gdu(self):
  517. credentials = service_account.IDTokenCredentials.from_service_account_info(
  518. SERVICE_ACCOUNT_INFO_NON_GDU, target_audience=self.TARGET_AUDIENCE
  519. )
  520. assert (
  521. credentials._signer.key_id == SERVICE_ACCOUNT_INFO_NON_GDU["private_key_id"]
  522. )
  523. assert (
  524. credentials.service_account_email
  525. == SERVICE_ACCOUNT_INFO_NON_GDU["client_email"]
  526. )
  527. assert credentials._token_uri == SERVICE_ACCOUNT_INFO_NON_GDU["token_uri"]
  528. assert credentials._target_audience == self.TARGET_AUDIENCE
  529. assert credentials._use_iam_endpoint
  530. def test_from_service_account_file(self):
  531. info = SERVICE_ACCOUNT_INFO.copy()
  532. credentials = service_account.IDTokenCredentials.from_service_account_file(
  533. SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  534. )
  535. assert credentials.service_account_email == info["client_email"]
  536. assert credentials._signer.key_id == info["private_key_id"]
  537. assert credentials._token_uri == info["token_uri"]
  538. assert credentials._target_audience == self.TARGET_AUDIENCE
  539. assert not credentials._use_iam_endpoint
  540. def test_from_service_account_file_non_gdu(self):
  541. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  542. credentials = service_account.IDTokenCredentials.from_service_account_file(
  543. SERVICE_ACCOUNT_NON_GDU_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  544. )
  545. assert credentials.service_account_email == info["client_email"]
  546. assert credentials._signer.key_id == info["private_key_id"]
  547. assert credentials._token_uri == info["token_uri"]
  548. assert credentials._target_audience == self.TARGET_AUDIENCE
  549. assert credentials._use_iam_endpoint
  550. def test_default_state(self):
  551. credentials = self.make_credentials()
  552. assert not credentials.valid
  553. # Expiration hasn't been set yet
  554. assert not credentials.expired
  555. def test_sign_bytes(self):
  556. credentials = self.make_credentials()
  557. to_sign = b"123"
  558. signature = credentials.sign_bytes(to_sign)
  559. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  560. def test_signer(self):
  561. credentials = self.make_credentials()
  562. assert isinstance(credentials.signer, crypt.Signer)
  563. def test_signer_email(self):
  564. credentials = self.make_credentials()
  565. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  566. def test_with_target_audience(self):
  567. credentials = self.make_credentials()
  568. new_credentials = credentials.with_target_audience("https://new.example.com")
  569. assert new_credentials._target_audience == "https://new.example.com"
  570. def test__with_use_iam_endpoint(self):
  571. credentials = self.make_credentials()
  572. new_credentials = credentials._with_use_iam_endpoint(True)
  573. assert new_credentials._use_iam_endpoint
  574. def test__with_use_iam_endpoint_non_default_universe_domain(self):
  575. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  576. with pytest.raises(exceptions.InvalidValue) as excinfo:
  577. credentials._with_use_iam_endpoint(False)
  578. assert excinfo.match(
  579. "use_iam_endpoint should be True for non-default universe domain"
  580. )
  581. def test_with_quota_project(self):
  582. credentials = self.make_credentials()
  583. new_credentials = credentials.with_quota_project("project-foo")
  584. assert new_credentials._quota_project_id == "project-foo"
  585. def test_with_token_uri(self):
  586. credentials = self.make_credentials()
  587. new_token_uri = "https://example2.com/oauth2/token"
  588. assert credentials._token_uri == self.TOKEN_URI
  589. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  590. assert creds_with_new_token_uri._token_uri == new_token_uri
  591. def test__make_authorization_grant_assertion(self):
  592. credentials = self.make_credentials()
  593. token = credentials._make_authorization_grant_assertion()
  594. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  595. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  596. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  597. assert payload["target_audience"] == self.TARGET_AUDIENCE
  598. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  599. def test_refresh_success(self, id_token_jwt_grant):
  600. credentials = self.make_credentials()
  601. token = "token"
  602. id_token_jwt_grant.return_value = (
  603. token,
  604. _helpers.utcnow() + datetime.timedelta(seconds=500),
  605. {},
  606. )
  607. request = mock.create_autospec(transport.Request, instance=True)
  608. # Refresh credentials
  609. credentials.refresh(request)
  610. # Check jwt grant call.
  611. assert id_token_jwt_grant.called
  612. called_request, token_uri, assertion = id_token_jwt_grant.call_args[0]
  613. assert called_request == request
  614. assert token_uri == credentials._token_uri
  615. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  616. # No further assertion done on the token, as there are separate tests
  617. # for checking the authorization grant assertion.
  618. # Check that the credentials have the token.
  619. assert credentials.token == token
  620. # Check that the credentials are valid (have a token and are not
  621. # expired)
  622. assert credentials.valid
  623. @mock.patch(
  624. "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
  625. )
  626. def test_refresh_iam_flow(self, call_iam_generate_id_token_endpoint):
  627. credentials = self.make_credentials()
  628. credentials._use_iam_endpoint = True
  629. token = "id_token"
  630. call_iam_generate_id_token_endpoint.return_value = (
  631. token,
  632. _helpers.utcnow() + datetime.timedelta(seconds=500),
  633. )
  634. request = mock.Mock()
  635. credentials.refresh(request)
  636. req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
  637. 0
  638. ]
  639. assert req == request
  640. assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT
  641. assert signer_email == "service-account@example.com"
  642. assert target_audience == "https://example.com"
  643. decoded_access_token = jwt.decode(access_token, verify=False)
  644. assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
  645. @mock.patch(
  646. "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
  647. )
  648. def test_refresh_iam_flow_non_gdu(self, call_iam_generate_id_token_endpoint):
  649. credentials = self.make_credentials(universe_domain="fake-universe")
  650. token = "id_token"
  651. call_iam_generate_id_token_endpoint.return_value = (
  652. token,
  653. _helpers.utcnow() + datetime.timedelta(seconds=500),
  654. )
  655. request = mock.Mock()
  656. credentials.refresh(request)
  657. req, iam_endpoint, signer_email, target_audience, access_token = call_iam_generate_id_token_endpoint.call_args[
  658. 0
  659. ]
  660. assert req == request
  661. assert (
  662. iam_endpoint
  663. == "https://iamcredentials.fake-universe/v1/projects/-/serviceAccounts/{}:generateIdToken"
  664. )
  665. assert signer_email == "service-account@example.com"
  666. assert target_audience == "https://example.com"
  667. decoded_access_token = jwt.decode(access_token, verify=False)
  668. assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
  669. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  670. def test_before_request_refreshes(self, id_token_jwt_grant):
  671. credentials = self.make_credentials()
  672. token = "token"
  673. id_token_jwt_grant.return_value = (
  674. token,
  675. _helpers.utcnow() + datetime.timedelta(seconds=500),
  676. None,
  677. )
  678. request = mock.create_autospec(transport.Request, instance=True)
  679. # Credentials should start as invalid
  680. assert not credentials.valid
  681. # before_request should cause a refresh
  682. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  683. # The refresh endpoint should've been called.
  684. assert id_token_jwt_grant.called
  685. # Credentials should now be valid.
  686. assert credentials.valid