test_service_account.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import json
  16. import os
  17. import mock
  18. import pytest # type: ignore
  19. from google.auth import _helpers
  20. from google.auth import crypt
  21. from google.auth import exceptions
  22. from google.auth import iam
  23. from google.auth import jwt
  24. from google.auth import transport
  25. from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN
  26. from google.oauth2 import service_account
  27. import yatest.common as yc
  28. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "..", "data")
  29. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  30. PRIVATE_KEY_BYTES = fh.read()
  31. with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
  32. PUBLIC_CERT_BYTES = fh.read()
  33. with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
  34. OTHER_CERT_BYTES = fh.read()
  35. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  36. SERVICE_ACCOUNT_NON_GDU_JSON_FILE = os.path.join(
  37. DATA_DIR, "service_account_non_gdu.json"
  38. )
  39. FAKE_UNIVERSE_DOMAIN = "universe.foo"
  40. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  41. SERVICE_ACCOUNT_INFO = json.load(fh)
  42. with open(SERVICE_ACCOUNT_NON_GDU_JSON_FILE, "rb") as fh:
  43. SERVICE_ACCOUNT_INFO_NON_GDU = json.load(fh)
  44. SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  45. class TestCredentials(object):
  46. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  47. TOKEN_URI = "https://example.com/oauth2/token"
  48. @classmethod
  49. def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
  50. return service_account.Credentials(
  51. SIGNER,
  52. cls.SERVICE_ACCOUNT_EMAIL,
  53. cls.TOKEN_URI,
  54. universe_domain=universe_domain,
  55. )
  56. def test_get_cred_info(self):
  57. credentials = self.make_credentials()
  58. assert not credentials.get_cred_info()
  59. credentials._cred_file_path = "/path/to/file"
  60. assert credentials.get_cred_info() == {
  61. "credential_source": "/path/to/file",
  62. "credential_type": "service account credentials",
  63. "principal": "service-account@example.com",
  64. }
  65. def test__make_copy_get_cred_info(self):
  66. credentials = self.make_credentials()
  67. credentials._cred_file_path = "/path/to/file"
  68. cred_copy = credentials._make_copy()
  69. assert cred_copy._cred_file_path == "/path/to/file"
  70. def test_constructor_no_universe_domain(self):
  71. credentials = service_account.Credentials(
  72. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, universe_domain=None
  73. )
  74. assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN
  75. def test_from_service_account_info(self):
  76. credentials = service_account.Credentials.from_service_account_info(
  77. SERVICE_ACCOUNT_INFO
  78. )
  79. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  80. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  81. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  82. assert credentials._universe_domain == DEFAULT_UNIVERSE_DOMAIN
  83. assert not credentials._always_use_jwt_access
  84. def test_from_service_account_info_non_gdu(self):
  85. credentials = service_account.Credentials.from_service_account_info(
  86. SERVICE_ACCOUNT_INFO_NON_GDU
  87. )
  88. assert credentials.universe_domain == FAKE_UNIVERSE_DOMAIN
  89. assert credentials._always_use_jwt_access
  90. def test_from_service_account_info_args(self):
  91. info = SERVICE_ACCOUNT_INFO.copy()
  92. scopes = ["email", "profile"]
  93. subject = "subject"
  94. additional_claims = {"meta": "data"}
  95. credentials = service_account.Credentials.from_service_account_info(
  96. info, scopes=scopes, subject=subject, additional_claims=additional_claims
  97. )
  98. assert credentials.service_account_email == info["client_email"]
  99. assert credentials.project_id == info["project_id"]
  100. assert credentials._signer.key_id == info["private_key_id"]
  101. assert credentials._token_uri == info["token_uri"]
  102. assert credentials._scopes == scopes
  103. assert credentials._subject == subject
  104. assert credentials._additional_claims == additional_claims
  105. assert not credentials._always_use_jwt_access
  106. def test_from_service_account_file(self):
  107. info = SERVICE_ACCOUNT_INFO.copy()
  108. credentials = service_account.Credentials.from_service_account_file(
  109. SERVICE_ACCOUNT_JSON_FILE
  110. )
  111. assert credentials.service_account_email == info["client_email"]
  112. assert credentials.project_id == info["project_id"]
  113. assert credentials._signer.key_id == info["private_key_id"]
  114. assert credentials._token_uri == info["token_uri"]
  115. def test_from_service_account_file_non_gdu(self):
  116. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  117. credentials = service_account.Credentials.from_service_account_file(
  118. SERVICE_ACCOUNT_NON_GDU_JSON_FILE
  119. )
  120. assert credentials.service_account_email == info["client_email"]
  121. assert credentials.project_id == info["project_id"]
  122. assert credentials._signer.key_id == info["private_key_id"]
  123. assert credentials._token_uri == info["token_uri"]
  124. assert credentials._universe_domain == FAKE_UNIVERSE_DOMAIN
  125. assert credentials._always_use_jwt_access
  126. def test_from_service_account_file_args(self):
  127. info = SERVICE_ACCOUNT_INFO.copy()
  128. scopes = ["email", "profile"]
  129. subject = "subject"
  130. additional_claims = {"meta": "data"}
  131. credentials = service_account.Credentials.from_service_account_file(
  132. SERVICE_ACCOUNT_JSON_FILE,
  133. subject=subject,
  134. scopes=scopes,
  135. additional_claims=additional_claims,
  136. )
  137. assert credentials.service_account_email == info["client_email"]
  138. assert credentials.project_id == info["project_id"]
  139. assert credentials._signer.key_id == info["private_key_id"]
  140. assert credentials._token_uri == info["token_uri"]
  141. assert credentials._scopes == scopes
  142. assert credentials._subject == subject
  143. assert credentials._additional_claims == additional_claims
  144. def test_default_state(self):
  145. credentials = self.make_credentials()
  146. assert not credentials.valid
  147. # Expiration hasn't been set yet
  148. assert not credentials.expired
  149. # Scopes haven't been specified yet
  150. assert credentials.requires_scopes
  151. def test_sign_bytes(self):
  152. credentials = self.make_credentials()
  153. to_sign = b"123"
  154. signature = credentials.sign_bytes(to_sign)
  155. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  156. def test_signer(self):
  157. credentials = self.make_credentials()
  158. assert isinstance(credentials.signer, crypt.Signer)
  159. def test_signer_email(self):
  160. credentials = self.make_credentials()
  161. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  162. def test_create_scoped(self):
  163. credentials = self.make_credentials()
  164. scopes = ["email", "profile"]
  165. credentials = credentials.with_scopes(scopes)
  166. assert credentials._scopes == scopes
  167. def test_with_claims(self):
  168. credentials = self.make_credentials()
  169. new_credentials = credentials.with_claims({"meep": "moop"})
  170. assert new_credentials._additional_claims == {"meep": "moop"}
  171. def test_with_quota_project(self):
  172. credentials = self.make_credentials()
  173. new_credentials = credentials.with_quota_project("new-project-456")
  174. assert new_credentials.quota_project_id == "new-project-456"
  175. hdrs = {}
  176. new_credentials.apply(hdrs, token="tok")
  177. assert "x-goog-user-project" in hdrs
  178. def test_with_token_uri(self):
  179. credentials = self.make_credentials()
  180. new_token_uri = "https://example2.com/oauth2/token"
  181. assert credentials._token_uri == self.TOKEN_URI
  182. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  183. assert creds_with_new_token_uri._token_uri == new_token_uri
  184. def test_with_universe_domain(self):
  185. credentials = self.make_credentials()
  186. new_credentials = credentials.with_universe_domain("dummy_universe.com")
  187. assert new_credentials.universe_domain == "dummy_universe.com"
  188. assert new_credentials._always_use_jwt_access
  189. new_credentials = credentials.with_universe_domain("googleapis.com")
  190. assert new_credentials.universe_domain == "googleapis.com"
  191. assert not new_credentials._always_use_jwt_access
  192. def test__with_always_use_jwt_access(self):
  193. credentials = self.make_credentials()
  194. assert not credentials._always_use_jwt_access
  195. new_credentials = credentials.with_always_use_jwt_access(True)
  196. assert new_credentials._always_use_jwt_access
  197. def test__with_always_use_jwt_access_non_default_universe_domain(self):
  198. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  199. with pytest.raises(exceptions.InvalidValue) as excinfo:
  200. credentials.with_always_use_jwt_access(False)
  201. assert excinfo.match(
  202. "always_use_jwt_access should be True for non-default universe domain"
  203. )
  204. def test__make_authorization_grant_assertion(self):
  205. credentials = self.make_credentials()
  206. token = credentials._make_authorization_grant_assertion()
  207. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  208. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  209. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  210. def test__make_authorization_grant_assertion_scoped(self):
  211. credentials = self.make_credentials()
  212. scopes = ["email", "profile"]
  213. credentials = credentials.with_scopes(scopes)
  214. token = credentials._make_authorization_grant_assertion()
  215. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  216. assert payload["scope"] == "email profile"
  217. def test__make_authorization_grant_assertion_subject(self):
  218. credentials = self.make_credentials()
  219. subject = "user@example.com"
  220. credentials = credentials.with_subject(subject)
  221. token = credentials._make_authorization_grant_assertion()
  222. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  223. assert payload["sub"] == subject
  224. def test_apply_with_quota_project_id(self):
  225. credentials = service_account.Credentials(
  226. SIGNER,
  227. self.SERVICE_ACCOUNT_EMAIL,
  228. self.TOKEN_URI,
  229. quota_project_id="quota-project-123",
  230. )
  231. headers = {}
  232. credentials.apply(headers, token="token")
  233. assert headers["x-goog-user-project"] == "quota-project-123"
  234. assert "token" in headers["authorization"]
  235. def test_apply_with_no_quota_project_id(self):
  236. credentials = service_account.Credentials(
  237. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  238. )
  239. headers = {}
  240. credentials.apply(headers, token="token")
  241. assert "x-goog-user-project" not in headers
  242. assert "token" in headers["authorization"]
  243. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  244. def test__create_self_signed_jwt(self, jwt):
  245. credentials = service_account.Credentials(
  246. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
  247. )
  248. audience = "https://pubsub.googleapis.com"
  249. credentials._create_self_signed_jwt(audience)
  250. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  251. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  252. def test__create_self_signed_jwt_with_user_scopes(self, jwt):
  253. credentials = service_account.Credentials(
  254. SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
  255. )
  256. audience = "https://pubsub.googleapis.com"
  257. credentials._create_self_signed_jwt(audience)
  258. # JWT should not be created if there are user-defined scopes
  259. jwt.from_signing_credentials.assert_not_called()
  260. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  261. def test__create_self_signed_jwt_always_use_jwt_access_with_audience(self, jwt):
  262. credentials = service_account.Credentials(
  263. SIGNER,
  264. self.SERVICE_ACCOUNT_EMAIL,
  265. self.TOKEN_URI,
  266. default_scopes=["bar", "foo"],
  267. always_use_jwt_access=True,
  268. )
  269. audience = "https://pubsub.googleapis.com"
  270. credentials._create_self_signed_jwt(audience)
  271. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  272. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  273. def test__create_self_signed_jwt_always_use_jwt_access_with_audience_similar_jwt_is_reused(
  274. self, jwt
  275. ):
  276. credentials = service_account.Credentials(
  277. SIGNER,
  278. self.SERVICE_ACCOUNT_EMAIL,
  279. self.TOKEN_URI,
  280. default_scopes=["bar", "foo"],
  281. always_use_jwt_access=True,
  282. )
  283. audience = "https://pubsub.googleapis.com"
  284. credentials._create_self_signed_jwt(audience)
  285. credentials._jwt_credentials._audience = audience
  286. credentials._create_self_signed_jwt(audience)
  287. jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
  288. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  289. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes(self, jwt):
  290. credentials = service_account.Credentials(
  291. SIGNER,
  292. self.SERVICE_ACCOUNT_EMAIL,
  293. self.TOKEN_URI,
  294. scopes=["bar", "foo"],
  295. always_use_jwt_access=True,
  296. )
  297. audience = "https://pubsub.googleapis.com"
  298. credentials._create_self_signed_jwt(audience)
  299. jwt.from_signing_credentials.assert_called_once_with(
  300. credentials, None, additional_claims={"scope": "bar foo"}
  301. )
  302. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  303. def test__create_self_signed_jwt_always_use_jwt_access_with_scopes_similar_jwt_is_reused(
  304. self, jwt
  305. ):
  306. credentials = service_account.Credentials(
  307. SIGNER,
  308. self.SERVICE_ACCOUNT_EMAIL,
  309. self.TOKEN_URI,
  310. scopes=["bar", "foo"],
  311. always_use_jwt_access=True,
  312. )
  313. audience = "https://pubsub.googleapis.com"
  314. credentials._create_self_signed_jwt(audience)
  315. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  316. credentials._create_self_signed_jwt(audience)
  317. jwt.from_signing_credentials.assert_called_once_with(
  318. credentials, None, additional_claims={"scope": "bar foo"}
  319. )
  320. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  321. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes(
  322. self, jwt
  323. ):
  324. credentials = service_account.Credentials(
  325. SIGNER,
  326. self.SERVICE_ACCOUNT_EMAIL,
  327. self.TOKEN_URI,
  328. default_scopes=["bar", "foo"],
  329. always_use_jwt_access=True,
  330. )
  331. credentials._create_self_signed_jwt(None)
  332. jwt.from_signing_credentials.assert_called_once_with(
  333. credentials, None, additional_claims={"scope": "bar foo"}
  334. )
  335. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  336. def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes_similar_jwt_is_reused(
  337. self, jwt
  338. ):
  339. credentials = service_account.Credentials(
  340. SIGNER,
  341. self.SERVICE_ACCOUNT_EMAIL,
  342. self.TOKEN_URI,
  343. default_scopes=["bar", "foo"],
  344. always_use_jwt_access=True,
  345. )
  346. credentials._create_self_signed_jwt(None)
  347. credentials._jwt_credentials.additional_claims = {"scope": "bar foo"}
  348. credentials._create_self_signed_jwt(None)
  349. jwt.from_signing_credentials.assert_called_once_with(
  350. credentials, None, additional_claims={"scope": "bar foo"}
  351. )
  352. @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
  353. def test__create_self_signed_jwt_always_use_jwt_access(self, jwt):
  354. credentials = service_account.Credentials(
  355. SIGNER,
  356. self.SERVICE_ACCOUNT_EMAIL,
  357. self.TOKEN_URI,
  358. always_use_jwt_access=True,
  359. )
  360. credentials._create_self_signed_jwt(None)
  361. jwt.from_signing_credentials.assert_not_called()
  362. def test_token_usage_metrics_assertion(self):
  363. credentials = service_account.Credentials(
  364. SIGNER,
  365. self.SERVICE_ACCOUNT_EMAIL,
  366. self.TOKEN_URI,
  367. always_use_jwt_access=False,
  368. )
  369. credentials.token = "token"
  370. credentials.expiry = None
  371. headers = {}
  372. credentials.before_request(mock.Mock(), None, None, headers)
  373. assert headers["authorization"] == "Bearer token"
  374. assert headers["x-goog-api-client"] == "cred-type/sa"
  375. def test_token_usage_metrics_self_signed_jwt(self):
  376. credentials = service_account.Credentials(
  377. SIGNER,
  378. self.SERVICE_ACCOUNT_EMAIL,
  379. self.TOKEN_URI,
  380. always_use_jwt_access=True,
  381. )
  382. credentials._create_self_signed_jwt("foo.googleapis.com")
  383. credentials.token = "token"
  384. credentials.expiry = None
  385. headers = {}
  386. credentials.before_request(mock.Mock(), None, None, headers)
  387. assert headers["authorization"] == "Bearer token"
  388. assert headers["x-goog-api-client"] == "cred-type/jwt"
  389. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  390. def test_refresh_success(self, jwt_grant):
  391. credentials = self.make_credentials()
  392. token = "token"
  393. jwt_grant.return_value = (
  394. token,
  395. _helpers.utcnow() + datetime.timedelta(seconds=500),
  396. {},
  397. )
  398. request = mock.create_autospec(transport.Request, instance=True)
  399. # Refresh credentials
  400. credentials.refresh(request)
  401. # Check jwt grant call.
  402. assert jwt_grant.called
  403. called_request, token_uri, assertion = jwt_grant.call_args[0]
  404. assert called_request == request
  405. assert token_uri == credentials._token_uri
  406. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  407. # No further assertion done on the token, as there are separate tests
  408. # for checking the authorization grant assertion.
  409. # Check that the credentials have the token.
  410. assert credentials.token == token
  411. # Check that the credentials are valid (have a token and are not
  412. # expired)
  413. assert credentials.valid
  414. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  415. def test_before_request_refreshes(self, jwt_grant):
  416. credentials = self.make_credentials()
  417. token = "token"
  418. jwt_grant.return_value = (
  419. token,
  420. _helpers.utcnow() + datetime.timedelta(seconds=500),
  421. None,
  422. )
  423. request = mock.create_autospec(transport.Request, instance=True)
  424. # Credentials should start as invalid
  425. assert not credentials.valid
  426. # before_request should cause a refresh
  427. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  428. # The refresh endpoint should've been called.
  429. assert jwt_grant.called
  430. # Credentials should now be valid.
  431. assert credentials.valid
  432. @mock.patch("google.auth.jwt.Credentials._make_jwt")
  433. def test_refresh_with_jwt_credentials(self, make_jwt):
  434. credentials = self.make_credentials()
  435. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  436. request = mock.create_autospec(transport.Request, instance=True)
  437. token = "token"
  438. expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
  439. make_jwt.return_value = (b"token", expiry)
  440. # Credentials should start as invalid
  441. assert not credentials.valid
  442. # before_request should cause a refresh
  443. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  444. # Credentials should now be valid.
  445. assert credentials.valid
  446. # Assert make_jwt was called
  447. assert make_jwt.call_count == 1
  448. assert credentials.token == token
  449. assert credentials.expiry == expiry
  450. def test_refresh_with_jwt_credentials_token_type_check(self):
  451. credentials = self.make_credentials()
  452. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  453. credentials.refresh(mock.Mock())
  454. # Credentials token should be a JWT string.
  455. assert isinstance(credentials.token, str)
  456. payload = jwt.decode(credentials.token, verify=False)
  457. assert payload["aud"] == "https://pubsub.googleapis.com"
  458. @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
  459. @mock.patch("google.auth.jwt.Credentials.refresh", autospec=True)
  460. def test_refresh_jwt_not_used_for_domain_wide_delegation(
  461. self, self_signed_jwt_refresh, jwt_grant
  462. ):
  463. # Create a domain wide delegation credentials by setting the subject.
  464. credentials = service_account.Credentials(
  465. SIGNER,
  466. self.SERVICE_ACCOUNT_EMAIL,
  467. self.TOKEN_URI,
  468. always_use_jwt_access=True,
  469. subject="subject",
  470. )
  471. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  472. jwt_grant.return_value = (
  473. "token",
  474. _helpers.utcnow() + datetime.timedelta(seconds=500),
  475. {},
  476. )
  477. request = mock.create_autospec(transport.Request, instance=True)
  478. # Refresh credentials
  479. credentials.refresh(request)
  480. # Make sure we are using jwt_grant and not self signed JWT refresh
  481. # method to obtain the token.
  482. assert jwt_grant.called
  483. assert not self_signed_jwt_refresh.called
  484. def test_refresh_missing_jwt_credentials(self):
  485. credentials = self.make_credentials()
  486. credentials = credentials.with_scopes(["foo", "bar"])
  487. credentials = credentials.with_always_use_jwt_access(True)
  488. assert not credentials._jwt_credentials
  489. credentials.refresh(mock.Mock())
  490. # jwt credentials should have been automatically created with scopes
  491. assert credentials._jwt_credentials is not None
  492. def test_refresh_non_gdu_domain_wide_delegation_not_supported(self):
  493. credentials = self.make_credentials(universe_domain="foo")
  494. credentials._subject = "bar@example.com"
  495. credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
  496. with pytest.raises(exceptions.RefreshError) as excinfo:
  497. credentials.refresh(None)
  498. assert excinfo.match("domain wide delegation is not supported")
  499. class TestIDTokenCredentials(object):
  500. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  501. TOKEN_URI = "https://example.com/oauth2/token"
  502. TARGET_AUDIENCE = "https://example.com"
  503. @classmethod
  504. def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
  505. return service_account.IDTokenCredentials(
  506. SIGNER,
  507. cls.SERVICE_ACCOUNT_EMAIL,
  508. cls.TOKEN_URI,
  509. cls.TARGET_AUDIENCE,
  510. universe_domain=universe_domain,
  511. )
  512. def test_constructor_no_universe_domain(self):
  513. credentials = service_account.IDTokenCredentials(
  514. SIGNER,
  515. self.SERVICE_ACCOUNT_EMAIL,
  516. self.TOKEN_URI,
  517. self.TARGET_AUDIENCE,
  518. universe_domain=None,
  519. )
  520. assert credentials._universe_domain == DEFAULT_UNIVERSE_DOMAIN
  521. def test_from_service_account_info(self):
  522. credentials = service_account.IDTokenCredentials.from_service_account_info(
  523. SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
  524. )
  525. assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
  526. assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
  527. assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
  528. assert credentials._target_audience == self.TARGET_AUDIENCE
  529. assert not credentials._use_iam_endpoint
  530. def test_from_service_account_info_non_gdu(self):
  531. credentials = service_account.IDTokenCredentials.from_service_account_info(
  532. SERVICE_ACCOUNT_INFO_NON_GDU, target_audience=self.TARGET_AUDIENCE
  533. )
  534. assert (
  535. credentials._signer.key_id == SERVICE_ACCOUNT_INFO_NON_GDU["private_key_id"]
  536. )
  537. assert (
  538. credentials.service_account_email
  539. == SERVICE_ACCOUNT_INFO_NON_GDU["client_email"]
  540. )
  541. assert credentials._token_uri == SERVICE_ACCOUNT_INFO_NON_GDU["token_uri"]
  542. assert credentials._target_audience == self.TARGET_AUDIENCE
  543. assert credentials._use_iam_endpoint
  544. def test_from_service_account_file(self):
  545. info = SERVICE_ACCOUNT_INFO.copy()
  546. credentials = service_account.IDTokenCredentials.from_service_account_file(
  547. SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  548. )
  549. assert credentials.service_account_email == info["client_email"]
  550. assert credentials._signer.key_id == info["private_key_id"]
  551. assert credentials._token_uri == info["token_uri"]
  552. assert credentials._target_audience == self.TARGET_AUDIENCE
  553. assert not credentials._use_iam_endpoint
  554. def test_from_service_account_file_non_gdu(self):
  555. info = SERVICE_ACCOUNT_INFO_NON_GDU.copy()
  556. credentials = service_account.IDTokenCredentials.from_service_account_file(
  557. SERVICE_ACCOUNT_NON_GDU_JSON_FILE, target_audience=self.TARGET_AUDIENCE
  558. )
  559. assert credentials.service_account_email == info["client_email"]
  560. assert credentials._signer.key_id == info["private_key_id"]
  561. assert credentials._token_uri == info["token_uri"]
  562. assert credentials._target_audience == self.TARGET_AUDIENCE
  563. assert credentials._use_iam_endpoint
  564. def test_default_state(self):
  565. credentials = self.make_credentials()
  566. assert not credentials.valid
  567. # Expiration hasn't been set yet
  568. assert not credentials.expired
  569. def test_sign_bytes(self):
  570. credentials = self.make_credentials()
  571. to_sign = b"123"
  572. signature = credentials.sign_bytes(to_sign)
  573. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  574. def test_signer(self):
  575. credentials = self.make_credentials()
  576. assert isinstance(credentials.signer, crypt.Signer)
  577. def test_signer_email(self):
  578. credentials = self.make_credentials()
  579. assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
  580. def test_with_target_audience(self):
  581. credentials = self.make_credentials()
  582. new_credentials = credentials.with_target_audience("https://new.example.com")
  583. assert new_credentials._target_audience == "https://new.example.com"
  584. def test__with_use_iam_endpoint(self):
  585. credentials = self.make_credentials()
  586. new_credentials = credentials._with_use_iam_endpoint(True)
  587. assert new_credentials._use_iam_endpoint
  588. def test__with_use_iam_endpoint_non_default_universe_domain(self):
  589. credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
  590. with pytest.raises(exceptions.InvalidValue) as excinfo:
  591. credentials._with_use_iam_endpoint(False)
  592. assert excinfo.match(
  593. "use_iam_endpoint should be True for non-default universe domain"
  594. )
  595. def test_with_quota_project(self):
  596. credentials = self.make_credentials()
  597. new_credentials = credentials.with_quota_project("project-foo")
  598. assert new_credentials._quota_project_id == "project-foo"
  599. def test_with_token_uri(self):
  600. credentials = self.make_credentials()
  601. new_token_uri = "https://example2.com/oauth2/token"
  602. assert credentials._token_uri == self.TOKEN_URI
  603. creds_with_new_token_uri = credentials.with_token_uri(new_token_uri)
  604. assert creds_with_new_token_uri._token_uri == new_token_uri
  605. def test__make_authorization_grant_assertion(self):
  606. credentials = self.make_credentials()
  607. token = credentials._make_authorization_grant_assertion()
  608. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  609. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  610. assert payload["aud"] == service_account._GOOGLE_OAUTH2_TOKEN_ENDPOINT
  611. assert payload["target_audience"] == self.TARGET_AUDIENCE
  612. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  613. def test_refresh_success(self, id_token_jwt_grant):
  614. credentials = self.make_credentials()
  615. token = "token"
  616. id_token_jwt_grant.return_value = (
  617. token,
  618. _helpers.utcnow() + datetime.timedelta(seconds=500),
  619. {},
  620. )
  621. request = mock.create_autospec(transport.Request, instance=True)
  622. # Refresh credentials
  623. credentials.refresh(request)
  624. # Check jwt grant call.
  625. assert id_token_jwt_grant.called
  626. called_request, token_uri, assertion = id_token_jwt_grant.call_args[0]
  627. assert called_request == request
  628. assert token_uri == credentials._token_uri
  629. assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
  630. # No further assertion done on the token, as there are separate tests
  631. # for checking the authorization grant assertion.
  632. # Check that the credentials have the token.
  633. assert credentials.token == token
  634. # Check that the credentials are valid (have a token and are not
  635. # expired)
  636. assert credentials.valid
  637. @mock.patch(
  638. "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
  639. )
  640. def test_refresh_iam_flow(self, call_iam_generate_id_token_endpoint):
  641. credentials = self.make_credentials()
  642. credentials._use_iam_endpoint = True
  643. token = "id_token"
  644. call_iam_generate_id_token_endpoint.return_value = (
  645. token,
  646. _helpers.utcnow() + datetime.timedelta(seconds=500),
  647. )
  648. request = mock.Mock()
  649. credentials.refresh(request)
  650. req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
  651. 0
  652. ]
  653. assert req == request
  654. assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT
  655. assert signer_email == "service-account@example.com"
  656. assert target_audience == "https://example.com"
  657. decoded_access_token = jwt.decode(access_token, verify=False)
  658. assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
  659. @mock.patch(
  660. "google.oauth2._client.call_iam_generate_id_token_endpoint", autospec=True
  661. )
  662. def test_refresh_iam_flow_non_gdu(self, call_iam_generate_id_token_endpoint):
  663. credentials = self.make_credentials(universe_domain="fake-universe")
  664. token = "id_token"
  665. call_iam_generate_id_token_endpoint.return_value = (
  666. token,
  667. _helpers.utcnow() + datetime.timedelta(seconds=500),
  668. )
  669. request = mock.Mock()
  670. credentials.refresh(request)
  671. req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
  672. 0
  673. ]
  674. assert req == request
  675. assert (
  676. iam_endpoint
  677. == "https://iamcredentials.fake-universe/v1/projects/-/serviceAccounts/{}:generateIdToken"
  678. )
  679. assert signer_email == "service-account@example.com"
  680. assert target_audience == "https://example.com"
  681. decoded_access_token = jwt.decode(access_token, verify=False)
  682. assert decoded_access_token["scope"] == "https://www.googleapis.com/auth/iam"
  683. @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
  684. def test_before_request_refreshes(self, id_token_jwt_grant):
  685. credentials = self.make_credentials()
  686. token = "token"
  687. id_token_jwt_grant.return_value = (
  688. token,
  689. _helpers.utcnow() + datetime.timedelta(seconds=500),
  690. None,
  691. )
  692. request = mock.create_autospec(transport.Request, instance=True)
  693. # Credentials should start as invalid
  694. assert not credentials.valid
  695. # before_request should cause a refresh
  696. credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
  697. # The refresh endpoint should've been called.
  698. assert id_token_jwt_grant.called
  699. # Credentials should now be valid.
  700. assert credentials.valid