test_jwt.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. # Copyright 2014 Google Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import base64
  15. import datetime
  16. import json
  17. import os
  18. import mock
  19. import pytest # type: ignore
  20. from google.auth import _helpers
  21. from google.auth import crypt
  22. from google.auth import exceptions
  23. from google.auth import jwt
  24. import yatest.common as yc
  25. DATA_DIR = os.path.join(os.path.dirname(yc.source_path(__file__)), "data")
  26. with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
  27. PRIVATE_KEY_BYTES = fh.read()
  28. with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
  29. PUBLIC_CERT_BYTES = fh.read()
  30. with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
  31. OTHER_CERT_BYTES = fh.read()
  32. with open(os.path.join(DATA_DIR, "es256_privatekey.pem"), "rb") as fh:
  33. EC_PRIVATE_KEY_BYTES = fh.read()
  34. with open(os.path.join(DATA_DIR, "es256_public_cert.pem"), "rb") as fh:
  35. EC_PUBLIC_CERT_BYTES = fh.read()
  36. SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
  37. with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh:
  38. SERVICE_ACCOUNT_INFO = json.load(fh)
  39. @pytest.fixture
  40. def signer():
  41. return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
  42. def test_encode_basic(signer):
  43. test_payload = {"test": "value"}
  44. encoded = jwt.encode(signer, test_payload)
  45. header, payload, _, _ = jwt._unverified_decode(encoded)
  46. assert payload == test_payload
  47. assert header == {"typ": "JWT", "alg": "RS256", "kid": signer.key_id}
  48. def test_encode_extra_headers(signer):
  49. encoded = jwt.encode(signer, {}, header={"extra": "value"})
  50. header = jwt.decode_header(encoded)
  51. assert header == {
  52. "typ": "JWT",
  53. "alg": "RS256",
  54. "kid": signer.key_id,
  55. "extra": "value",
  56. }
  57. def test_encode_custom_alg_in_headers(signer):
  58. encoded = jwt.encode(signer, {}, header={"alg": "foo"})
  59. header = jwt.decode_header(encoded)
  60. assert header == {"typ": "JWT", "alg": "foo", "kid": signer.key_id}
  61. @pytest.fixture
  62. def es256_signer():
  63. return crypt.ES256Signer.from_string(EC_PRIVATE_KEY_BYTES, "1")
  64. def test_encode_basic_es256(es256_signer):
  65. test_payload = {"test": "value"}
  66. encoded = jwt.encode(es256_signer, test_payload)
  67. header, payload, _, _ = jwt._unverified_decode(encoded)
  68. assert payload == test_payload
  69. assert header == {"typ": "JWT", "alg": "ES256", "kid": es256_signer.key_id}
  70. @pytest.fixture
  71. def token_factory(signer, es256_signer):
  72. def factory(claims=None, key_id=None, use_es256_signer=False):
  73. now = _helpers.datetime_to_secs(_helpers.utcnow())
  74. payload = {
  75. "aud": "audience@example.com",
  76. "iat": now,
  77. "exp": now + 300,
  78. "user": "billy bob",
  79. "metadata": {"meta": "data"},
  80. }
  81. payload.update(claims or {})
  82. # False is specified to remove the signer's key id for testing
  83. # headers without key ids.
  84. if key_id is False:
  85. signer._key_id = None
  86. key_id = None
  87. if use_es256_signer:
  88. return jwt.encode(es256_signer, payload, key_id=key_id)
  89. else:
  90. return jwt.encode(signer, payload, key_id=key_id)
  91. return factory
  92. def test_decode_valid(token_factory):
  93. payload = jwt.decode(token_factory(), certs=PUBLIC_CERT_BYTES)
  94. assert payload["aud"] == "audience@example.com"
  95. assert payload["user"] == "billy bob"
  96. assert payload["metadata"]["meta"] == "data"
  97. def test_decode_header_object(token_factory):
  98. payload = token_factory()
  99. # Create a malformed JWT token with a number as a header instead of a
  100. # dictionary (3 == base64d(M7==))
  101. payload = b"M7." + b".".join(payload.split(b".")[1:])
  102. with pytest.raises(ValueError) as excinfo:
  103. jwt.decode(payload, certs=PUBLIC_CERT_BYTES)
  104. assert excinfo.match(r"Header segment should be a JSON object: " + str(b"M7"))
  105. def test_decode_payload_object(signer):
  106. # Create a malformed JWT token with a payload containing both "iat" and
  107. # "exp" strings, although not as fields of a dictionary
  108. payload = jwt.encode(signer, "iatexp")
  109. with pytest.raises(ValueError) as excinfo:
  110. jwt.decode(payload, certs=PUBLIC_CERT_BYTES)
  111. assert excinfo.match(
  112. r"Payload segment should be a JSON object: " + str(b"ImlhdGV4cCI")
  113. )
  114. def test_decode_valid_es256(token_factory):
  115. payload = jwt.decode(
  116. token_factory(use_es256_signer=True), certs=EC_PUBLIC_CERT_BYTES
  117. )
  118. assert payload["aud"] == "audience@example.com"
  119. assert payload["user"] == "billy bob"
  120. assert payload["metadata"]["meta"] == "data"
  121. def test_decode_valid_with_audience(token_factory):
  122. payload = jwt.decode(
  123. token_factory(), certs=PUBLIC_CERT_BYTES, audience="audience@example.com"
  124. )
  125. assert payload["aud"] == "audience@example.com"
  126. assert payload["user"] == "billy bob"
  127. assert payload["metadata"]["meta"] == "data"
  128. def test_decode_valid_with_audience_list(token_factory):
  129. payload = jwt.decode(
  130. token_factory(),
  131. certs=PUBLIC_CERT_BYTES,
  132. audience=["audience@example.com", "another_audience@example.com"],
  133. )
  134. assert payload["aud"] == "audience@example.com"
  135. assert payload["user"] == "billy bob"
  136. assert payload["metadata"]["meta"] == "data"
  137. def test_decode_valid_unverified(token_factory):
  138. payload = jwt.decode(token_factory(), certs=OTHER_CERT_BYTES, verify=False)
  139. assert payload["aud"] == "audience@example.com"
  140. assert payload["user"] == "billy bob"
  141. assert payload["metadata"]["meta"] == "data"
  142. def test_decode_bad_token_wrong_number_of_segments():
  143. with pytest.raises(ValueError) as excinfo:
  144. jwt.decode("1.2", PUBLIC_CERT_BYTES)
  145. assert excinfo.match(r"Wrong number of segments")
  146. def test_decode_bad_token_not_base64():
  147. with pytest.raises((ValueError, TypeError)) as excinfo:
  148. jwt.decode("1.2.3", PUBLIC_CERT_BYTES)
  149. assert excinfo.match(r"Incorrect padding|more than a multiple of 4")
  150. def test_decode_bad_token_not_json():
  151. token = b".".join([base64.urlsafe_b64encode(b"123!")] * 3)
  152. with pytest.raises(ValueError) as excinfo:
  153. jwt.decode(token, PUBLIC_CERT_BYTES)
  154. assert excinfo.match(r"Can\'t parse segment")
  155. def test_decode_bad_token_no_iat_or_exp(signer):
  156. token = jwt.encode(signer, {"test": "value"})
  157. with pytest.raises(ValueError) as excinfo:
  158. jwt.decode(token, PUBLIC_CERT_BYTES)
  159. assert excinfo.match(r"Token does not contain required claim")
  160. def test_decode_bad_token_too_early(token_factory):
  161. token = token_factory(
  162. claims={
  163. "iat": _helpers.datetime_to_secs(
  164. _helpers.utcnow() + datetime.timedelta(hours=1)
  165. )
  166. }
  167. )
  168. with pytest.raises(ValueError) as excinfo:
  169. jwt.decode(token, PUBLIC_CERT_BYTES, clock_skew_in_seconds=59)
  170. assert excinfo.match(r"Token used too early")
  171. def test_decode_bad_token_expired(token_factory):
  172. token = token_factory(
  173. claims={
  174. "exp": _helpers.datetime_to_secs(
  175. _helpers.utcnow() - datetime.timedelta(hours=1)
  176. )
  177. }
  178. )
  179. with pytest.raises(ValueError) as excinfo:
  180. jwt.decode(token, PUBLIC_CERT_BYTES, clock_skew_in_seconds=59)
  181. assert excinfo.match(r"Token expired")
  182. def test_decode_success_with_no_clock_skew(token_factory):
  183. token = token_factory(
  184. claims={
  185. "exp": _helpers.datetime_to_secs(
  186. _helpers.utcnow() + datetime.timedelta(seconds=1)
  187. ),
  188. "iat": _helpers.datetime_to_secs(
  189. _helpers.utcnow() - datetime.timedelta(seconds=1)
  190. ),
  191. }
  192. )
  193. jwt.decode(token, PUBLIC_CERT_BYTES)
  194. def test_decode_success_with_custom_clock_skew(token_factory):
  195. token = token_factory(
  196. claims={
  197. "exp": _helpers.datetime_to_secs(
  198. _helpers.utcnow() + datetime.timedelta(seconds=2)
  199. ),
  200. "iat": _helpers.datetime_to_secs(
  201. _helpers.utcnow() - datetime.timedelta(seconds=2)
  202. ),
  203. }
  204. )
  205. jwt.decode(token, PUBLIC_CERT_BYTES, clock_skew_in_seconds=1)
  206. def test_decode_bad_token_wrong_audience(token_factory):
  207. token = token_factory()
  208. audience = "audience2@example.com"
  209. with pytest.raises(ValueError) as excinfo:
  210. jwt.decode(token, PUBLIC_CERT_BYTES, audience=audience)
  211. assert excinfo.match(r"Token has wrong audience")
  212. def test_decode_bad_token_wrong_audience_list(token_factory):
  213. token = token_factory()
  214. audience = ["audience2@example.com", "audience3@example.com"]
  215. with pytest.raises(ValueError) as excinfo:
  216. jwt.decode(token, PUBLIC_CERT_BYTES, audience=audience)
  217. assert excinfo.match(r"Token has wrong audience")
  218. def test_decode_wrong_cert(token_factory):
  219. with pytest.raises(ValueError) as excinfo:
  220. jwt.decode(token_factory(), OTHER_CERT_BYTES)
  221. assert excinfo.match(r"Could not verify token signature")
  222. def test_decode_multicert_bad_cert(token_factory):
  223. certs = {"1": OTHER_CERT_BYTES, "2": PUBLIC_CERT_BYTES}
  224. with pytest.raises(ValueError) as excinfo:
  225. jwt.decode(token_factory(), certs)
  226. assert excinfo.match(r"Could not verify token signature")
  227. def test_decode_no_cert(token_factory):
  228. certs = {"2": PUBLIC_CERT_BYTES}
  229. with pytest.raises(ValueError) as excinfo:
  230. jwt.decode(token_factory(), certs)
  231. assert excinfo.match(r"Certificate for key id 1 not found")
  232. def test_decode_no_key_id(token_factory):
  233. token = token_factory(key_id=False)
  234. certs = {"2": PUBLIC_CERT_BYTES}
  235. payload = jwt.decode(token, certs)
  236. assert payload["user"] == "billy bob"
  237. def test_decode_unknown_alg():
  238. headers = json.dumps({u"kid": u"1", u"alg": u"fakealg"})
  239. token = b".".join(
  240. map(lambda seg: base64.b64encode(seg.encode("utf-8")), [headers, u"{}", u"sig"])
  241. )
  242. with pytest.raises(ValueError) as excinfo:
  243. jwt.decode(token)
  244. assert excinfo.match(r"fakealg")
  245. def test_decode_missing_crytography_alg(monkeypatch):
  246. monkeypatch.delitem(jwt._ALGORITHM_TO_VERIFIER_CLASS, "ES256")
  247. headers = json.dumps({u"kid": u"1", u"alg": u"ES256"})
  248. token = b".".join(
  249. map(lambda seg: base64.b64encode(seg.encode("utf-8")), [headers, u"{}", u"sig"])
  250. )
  251. with pytest.raises(ValueError) as excinfo:
  252. jwt.decode(token)
  253. assert excinfo.match(r"cryptography")
  254. def test_roundtrip_explicit_key_id(token_factory):
  255. token = token_factory(key_id="3")
  256. certs = {"2": OTHER_CERT_BYTES, "3": PUBLIC_CERT_BYTES}
  257. payload = jwt.decode(token, certs)
  258. assert payload["user"] == "billy bob"
  259. class TestCredentials(object):
  260. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  261. SUBJECT = "subject"
  262. AUDIENCE = "audience"
  263. ADDITIONAL_CLAIMS = {"meta": "data"}
  264. credentials = None
  265. @pytest.fixture(autouse=True)
  266. def credentials_fixture(self, signer):
  267. self.credentials = jwt.Credentials(
  268. signer,
  269. self.SERVICE_ACCOUNT_EMAIL,
  270. self.SERVICE_ACCOUNT_EMAIL,
  271. self.AUDIENCE,
  272. )
  273. def test_from_service_account_info(self):
  274. with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
  275. info = json.load(fh)
  276. credentials = jwt.Credentials.from_service_account_info(
  277. info, audience=self.AUDIENCE
  278. )
  279. assert credentials._signer.key_id == info["private_key_id"]
  280. assert credentials._issuer == info["client_email"]
  281. assert credentials._subject == info["client_email"]
  282. assert credentials._audience == self.AUDIENCE
  283. def test_from_service_account_info_args(self):
  284. info = SERVICE_ACCOUNT_INFO.copy()
  285. credentials = jwt.Credentials.from_service_account_info(
  286. info,
  287. subject=self.SUBJECT,
  288. audience=self.AUDIENCE,
  289. additional_claims=self.ADDITIONAL_CLAIMS,
  290. )
  291. assert credentials._signer.key_id == info["private_key_id"]
  292. assert credentials._issuer == info["client_email"]
  293. assert credentials._subject == self.SUBJECT
  294. assert credentials._audience == self.AUDIENCE
  295. assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
  296. def test_from_service_account_file(self):
  297. info = SERVICE_ACCOUNT_INFO.copy()
  298. credentials = jwt.Credentials.from_service_account_file(
  299. SERVICE_ACCOUNT_JSON_FILE, audience=self.AUDIENCE
  300. )
  301. assert credentials._signer.key_id == info["private_key_id"]
  302. assert credentials._issuer == info["client_email"]
  303. assert credentials._subject == info["client_email"]
  304. assert credentials._audience == self.AUDIENCE
  305. def test_from_service_account_file_args(self):
  306. info = SERVICE_ACCOUNT_INFO.copy()
  307. credentials = jwt.Credentials.from_service_account_file(
  308. SERVICE_ACCOUNT_JSON_FILE,
  309. subject=self.SUBJECT,
  310. audience=self.AUDIENCE,
  311. additional_claims=self.ADDITIONAL_CLAIMS,
  312. )
  313. assert credentials._signer.key_id == info["private_key_id"]
  314. assert credentials._issuer == info["client_email"]
  315. assert credentials._subject == self.SUBJECT
  316. assert credentials._audience == self.AUDIENCE
  317. assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
  318. def test_from_signing_credentials(self):
  319. jwt_from_signing = self.credentials.from_signing_credentials(
  320. self.credentials, audience=mock.sentinel.new_audience
  321. )
  322. jwt_from_info = jwt.Credentials.from_service_account_info(
  323. SERVICE_ACCOUNT_INFO, audience=mock.sentinel.new_audience
  324. )
  325. assert isinstance(jwt_from_signing, jwt.Credentials)
  326. assert jwt_from_signing._signer.key_id == jwt_from_info._signer.key_id
  327. assert jwt_from_signing._issuer == jwt_from_info._issuer
  328. assert jwt_from_signing._subject == jwt_from_info._subject
  329. assert jwt_from_signing._audience == jwt_from_info._audience
  330. def test_default_state(self):
  331. assert not self.credentials.valid
  332. # Expiration hasn't been set yet
  333. assert not self.credentials.expired
  334. def test_with_claims(self):
  335. new_audience = "new_audience"
  336. new_credentials = self.credentials.with_claims(audience=new_audience)
  337. assert new_credentials._signer == self.credentials._signer
  338. assert new_credentials._issuer == self.credentials._issuer
  339. assert new_credentials._subject == self.credentials._subject
  340. assert new_credentials._audience == new_audience
  341. assert new_credentials._additional_claims == self.credentials._additional_claims
  342. assert new_credentials._quota_project_id == self.credentials._quota_project_id
  343. def test__make_jwt_without_audience(self):
  344. cred = jwt.Credentials.from_service_account_info(
  345. SERVICE_ACCOUNT_INFO.copy(),
  346. subject=self.SUBJECT,
  347. audience=None,
  348. additional_claims={"scope": "foo bar"},
  349. )
  350. token, _ = cred._make_jwt()
  351. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  352. assert payload["scope"] == "foo bar"
  353. assert "aud" not in payload
  354. def test_with_quota_project(self):
  355. quota_project_id = "project-foo"
  356. new_credentials = self.credentials.with_quota_project(quota_project_id)
  357. assert new_credentials._signer == self.credentials._signer
  358. assert new_credentials._issuer == self.credentials._issuer
  359. assert new_credentials._subject == self.credentials._subject
  360. assert new_credentials._audience == self.credentials._audience
  361. assert new_credentials._additional_claims == self.credentials._additional_claims
  362. assert new_credentials.additional_claims == self.credentials._additional_claims
  363. assert new_credentials._quota_project_id == quota_project_id
  364. def test_sign_bytes(self):
  365. to_sign = b"123"
  366. signature = self.credentials.sign_bytes(to_sign)
  367. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  368. def test_signer(self):
  369. assert isinstance(self.credentials.signer, crypt.RSASigner)
  370. def test_signer_email(self):
  371. assert self.credentials.signer_email == SERVICE_ACCOUNT_INFO["client_email"]
  372. def _verify_token(self, token):
  373. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  374. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  375. return payload
  376. def test_refresh(self):
  377. self.credentials.refresh(None)
  378. assert self.credentials.valid
  379. assert not self.credentials.expired
  380. def test_expired(self):
  381. assert not self.credentials.expired
  382. self.credentials.refresh(None)
  383. assert not self.credentials.expired
  384. with mock.patch("google.auth._helpers.utcnow") as now:
  385. one_day = datetime.timedelta(days=1)
  386. now.return_value = self.credentials.expiry + one_day
  387. assert self.credentials.expired
  388. def test_before_request(self):
  389. headers = {}
  390. self.credentials.refresh(None)
  391. self.credentials.before_request(
  392. None, "GET", "http://example.com?a=1#3", headers
  393. )
  394. header_value = headers["authorization"]
  395. _, token = header_value.split(" ")
  396. # Since the audience is set, it should use the existing token.
  397. assert token.encode("utf-8") == self.credentials.token
  398. payload = self._verify_token(token)
  399. assert payload["aud"] == self.AUDIENCE
  400. def test_before_request_refreshes(self):
  401. assert not self.credentials.valid
  402. self.credentials.before_request(None, "GET", "http://example.com?a=1#3", {})
  403. assert self.credentials.valid
  404. class TestOnDemandCredentials(object):
  405. SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
  406. SUBJECT = "subject"
  407. ADDITIONAL_CLAIMS = {"meta": "data"}
  408. credentials = None
  409. @pytest.fixture(autouse=True)
  410. def credentials_fixture(self, signer):
  411. self.credentials = jwt.OnDemandCredentials(
  412. signer,
  413. self.SERVICE_ACCOUNT_EMAIL,
  414. self.SERVICE_ACCOUNT_EMAIL,
  415. max_cache_size=2,
  416. )
  417. def test_from_service_account_info(self):
  418. with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
  419. info = json.load(fh)
  420. credentials = jwt.OnDemandCredentials.from_service_account_info(info)
  421. assert credentials._signer.key_id == info["private_key_id"]
  422. assert credentials._issuer == info["client_email"]
  423. assert credentials._subject == info["client_email"]
  424. def test_from_service_account_info_args(self):
  425. info = SERVICE_ACCOUNT_INFO.copy()
  426. credentials = jwt.OnDemandCredentials.from_service_account_info(
  427. info, subject=self.SUBJECT, additional_claims=self.ADDITIONAL_CLAIMS
  428. )
  429. assert credentials._signer.key_id == info["private_key_id"]
  430. assert credentials._issuer == info["client_email"]
  431. assert credentials._subject == self.SUBJECT
  432. assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
  433. def test_from_service_account_file(self):
  434. info = SERVICE_ACCOUNT_INFO.copy()
  435. credentials = jwt.OnDemandCredentials.from_service_account_file(
  436. SERVICE_ACCOUNT_JSON_FILE
  437. )
  438. assert credentials._signer.key_id == info["private_key_id"]
  439. assert credentials._issuer == info["client_email"]
  440. assert credentials._subject == info["client_email"]
  441. def test_from_service_account_file_args(self):
  442. info = SERVICE_ACCOUNT_INFO.copy()
  443. credentials = jwt.OnDemandCredentials.from_service_account_file(
  444. SERVICE_ACCOUNT_JSON_FILE,
  445. subject=self.SUBJECT,
  446. additional_claims=self.ADDITIONAL_CLAIMS,
  447. )
  448. assert credentials._signer.key_id == info["private_key_id"]
  449. assert credentials._issuer == info["client_email"]
  450. assert credentials._subject == self.SUBJECT
  451. assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
  452. def test_from_signing_credentials(self):
  453. jwt_from_signing = self.credentials.from_signing_credentials(self.credentials)
  454. jwt_from_info = jwt.OnDemandCredentials.from_service_account_info(
  455. SERVICE_ACCOUNT_INFO
  456. )
  457. assert isinstance(jwt_from_signing, jwt.OnDemandCredentials)
  458. assert jwt_from_signing._signer.key_id == jwt_from_info._signer.key_id
  459. assert jwt_from_signing._issuer == jwt_from_info._issuer
  460. assert jwt_from_signing._subject == jwt_from_info._subject
  461. def test_default_state(self):
  462. # Credentials are *always* valid.
  463. assert self.credentials.valid
  464. # Credentials *never* expire.
  465. assert not self.credentials.expired
  466. def test_with_claims(self):
  467. new_claims = {"meep": "moop"}
  468. new_credentials = self.credentials.with_claims(additional_claims=new_claims)
  469. assert new_credentials._signer == self.credentials._signer
  470. assert new_credentials._issuer == self.credentials._issuer
  471. assert new_credentials._subject == self.credentials._subject
  472. assert new_credentials._additional_claims == new_claims
  473. def test_with_quota_project(self):
  474. quota_project_id = "project-foo"
  475. new_credentials = self.credentials.with_quota_project(quota_project_id)
  476. assert new_credentials._signer == self.credentials._signer
  477. assert new_credentials._issuer == self.credentials._issuer
  478. assert new_credentials._subject == self.credentials._subject
  479. assert new_credentials._additional_claims == self.credentials._additional_claims
  480. assert new_credentials._quota_project_id == quota_project_id
  481. def test_sign_bytes(self):
  482. to_sign = b"123"
  483. signature = self.credentials.sign_bytes(to_sign)
  484. assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
  485. def test_signer(self):
  486. assert isinstance(self.credentials.signer, crypt.RSASigner)
  487. def test_signer_email(self):
  488. assert self.credentials.signer_email == SERVICE_ACCOUNT_INFO["client_email"]
  489. def _verify_token(self, token):
  490. payload = jwt.decode(token, PUBLIC_CERT_BYTES)
  491. assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
  492. return payload
  493. def test_refresh(self):
  494. with pytest.raises(exceptions.RefreshError):
  495. self.credentials.refresh(None)
  496. def test_before_request(self):
  497. headers = {}
  498. self.credentials.before_request(
  499. None, "GET", "http://example.com?a=1#3", headers
  500. )
  501. _, token = headers["authorization"].split(" ")
  502. payload = self._verify_token(token)
  503. assert payload["aud"] == "http://example.com"
  504. # Making another request should re-use the same token.
  505. self.credentials.before_request(None, "GET", "http://example.com?b=2", headers)
  506. _, new_token = headers["authorization"].split(" ")
  507. assert new_token == token
  508. def test_expired_token(self):
  509. self.credentials._cache["audience"] = (
  510. mock.sentinel.token,
  511. datetime.datetime.min,
  512. )
  513. token = self.credentials._get_jwt_for_audience("audience")
  514. assert token != mock.sentinel.token