test__metadata.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import http.client as http_client
  16. import importlib
  17. import json
  18. import os
  19. import mock
  20. import pytest # type: ignore
  21. from google.auth import _helpers
  22. from google.auth import environment_vars
  23. from google.auth import exceptions
  24. from google.auth import transport
  25. from google.auth.compute_engine import _metadata
  26. PATH = "instance/service-accounts/default"
  27. DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
  28. SMBIOS_PRODUCT_NAME_FILE = os.path.join(DATA_DIR, "smbios_product_name")
  29. SMBIOS_PRODUCT_NAME_NONEXISTENT_FILE = os.path.join(
  30. DATA_DIR, "smbios_product_name_nonexistent"
  31. )
  32. SMBIOS_PRODUCT_NAME_NON_GOOGLE = os.path.join(
  33. DATA_DIR, "smbios_product_name_non_google"
  34. )
  35. ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
  36. "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds"
  37. )
  38. MDS_PING_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 auth-request-type/mds"
  39. MDS_PING_REQUEST_HEADER = {
  40. "metadata-flavor": "Google",
  41. "x-goog-api-client": MDS_PING_METRICS_HEADER_VALUE,
  42. }
  43. def make_request(data, status=http_client.OK, headers=None, retry=False):
  44. response = mock.create_autospec(transport.Response, instance=True)
  45. response.status = status
  46. response.data = _helpers.to_bytes(data)
  47. response.headers = headers or {}
  48. request = mock.create_autospec(transport.Request)
  49. if retry:
  50. request.side_effect = [exceptions.TransportError(), response]
  51. else:
  52. request.return_value = response
  53. return request
  54. @pytest.mark.xfail
  55. def test_detect_gce_residency_linux_success():
  56. _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_FILE
  57. assert _metadata.detect_gce_residency_linux()
  58. def test_detect_gce_residency_linux_non_google():
  59. _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_NON_GOOGLE
  60. assert not _metadata.detect_gce_residency_linux()
  61. def test_detect_gce_residency_linux_nonexistent():
  62. _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_NONEXISTENT_FILE
  63. assert not _metadata.detect_gce_residency_linux()
  64. def test_is_on_gce_ping_success():
  65. request = make_request("", headers=_metadata._METADATA_HEADERS)
  66. assert _metadata.is_on_gce(request)
  67. @mock.patch("os.name", new="nt")
  68. def test_is_on_gce_windows_success():
  69. request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
  70. assert not _metadata.is_on_gce(request)
  71. @pytest.mark.xfail
  72. @mock.patch("os.name", new="posix")
  73. def test_is_on_gce_linux_success():
  74. request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
  75. _metadata._GCE_PRODUCT_NAME_FILE = SMBIOS_PRODUCT_NAME_FILE
  76. assert _metadata.is_on_gce(request)
  77. @mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE)
  78. def test_ping_success(mock_metrics_header_value):
  79. request = make_request("", headers=_metadata._METADATA_HEADERS)
  80. assert _metadata.ping(request)
  81. request.assert_called_once_with(
  82. method="GET",
  83. url=_metadata._METADATA_IP_ROOT,
  84. headers=MDS_PING_REQUEST_HEADER,
  85. timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
  86. )
  87. @mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE)
  88. def test_ping_success_retry(mock_metrics_header_value):
  89. request = make_request("", headers=_metadata._METADATA_HEADERS, retry=True)
  90. assert _metadata.ping(request)
  91. request.assert_called_with(
  92. method="GET",
  93. url=_metadata._METADATA_IP_ROOT,
  94. headers=MDS_PING_REQUEST_HEADER,
  95. timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
  96. )
  97. assert request.call_count == 2
  98. def test_ping_failure_bad_flavor():
  99. request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
  100. assert not _metadata.ping(request)
  101. def test_ping_failure_connection_failed():
  102. request = make_request("")
  103. request.side_effect = exceptions.TransportError()
  104. assert not _metadata.ping(request)
  105. @mock.patch("google.auth.metrics.mds_ping", return_value=MDS_PING_METRICS_HEADER_VALUE)
  106. def _test_ping_success_custom_root(mock_metrics_header_value):
  107. request = make_request("", headers=_metadata._METADATA_HEADERS)
  108. fake_ip = "1.2.3.4"
  109. os.environ[environment_vars.GCE_METADATA_IP] = fake_ip
  110. importlib.reload(_metadata)
  111. try:
  112. assert _metadata.ping(request)
  113. finally:
  114. del os.environ[environment_vars.GCE_METADATA_IP]
  115. importlib.reload(_metadata)
  116. request.assert_called_once_with(
  117. method="GET",
  118. url="http://" + fake_ip,
  119. headers=MDS_PING_REQUEST_HEADER,
  120. timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
  121. )
  122. def test_get_success_json():
  123. key, value = "foo", "bar"
  124. data = json.dumps({key: value})
  125. request = make_request(data, headers={"content-type": "application/json"})
  126. result = _metadata.get(request, PATH)
  127. request.assert_called_once_with(
  128. method="GET",
  129. url=_metadata._METADATA_ROOT + PATH,
  130. headers=_metadata._METADATA_HEADERS,
  131. )
  132. assert result[key] == value
  133. def test_get_success_json_content_type_charset():
  134. key, value = "foo", "bar"
  135. data = json.dumps({key: value})
  136. request = make_request(
  137. data, headers={"content-type": "application/json; charset=UTF-8"}
  138. )
  139. result = _metadata.get(request, PATH)
  140. request.assert_called_once_with(
  141. method="GET",
  142. url=_metadata._METADATA_ROOT + PATH,
  143. headers=_metadata._METADATA_HEADERS,
  144. )
  145. assert result[key] == value
  146. def test_get_success_retry():
  147. key, value = "foo", "bar"
  148. data = json.dumps({key: value})
  149. request = make_request(
  150. data, headers={"content-type": "application/json"}, retry=True
  151. )
  152. result = _metadata.get(request, PATH)
  153. request.assert_called_with(
  154. method="GET",
  155. url=_metadata._METADATA_ROOT + PATH,
  156. headers=_metadata._METADATA_HEADERS,
  157. )
  158. assert request.call_count == 2
  159. assert result[key] == value
  160. def test_get_success_text():
  161. data = "foobar"
  162. request = make_request(data, headers={"content-type": "text/plain"})
  163. result = _metadata.get(request, PATH)
  164. request.assert_called_once_with(
  165. method="GET",
  166. url=_metadata._METADATA_ROOT + PATH,
  167. headers=_metadata._METADATA_HEADERS,
  168. )
  169. assert result == data
  170. def test_get_success_params():
  171. data = "foobar"
  172. request = make_request(data, headers={"content-type": "text/plain"})
  173. params = {"recursive": "true"}
  174. result = _metadata.get(request, PATH, params=params)
  175. request.assert_called_once_with(
  176. method="GET",
  177. url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
  178. headers=_metadata._METADATA_HEADERS,
  179. )
  180. assert result == data
  181. def test_get_success_recursive_and_params():
  182. data = "foobar"
  183. request = make_request(data, headers={"content-type": "text/plain"})
  184. params = {"recursive": "false"}
  185. result = _metadata.get(request, PATH, recursive=True, params=params)
  186. request.assert_called_once_with(
  187. method="GET",
  188. url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
  189. headers=_metadata._METADATA_HEADERS,
  190. )
  191. assert result == data
  192. def test_get_success_recursive():
  193. data = "foobar"
  194. request = make_request(data, headers={"content-type": "text/plain"})
  195. result = _metadata.get(request, PATH, recursive=True)
  196. request.assert_called_once_with(
  197. method="GET",
  198. url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
  199. headers=_metadata._METADATA_HEADERS,
  200. )
  201. assert result == data
  202. def _test_get_success_custom_root_new_variable():
  203. request = make_request("{}", headers={"content-type": "application/json"})
  204. fake_root = "another.metadata.service"
  205. os.environ[environment_vars.GCE_METADATA_HOST] = fake_root
  206. importlib.reload(_metadata)
  207. try:
  208. _metadata.get(request, PATH)
  209. finally:
  210. del os.environ[environment_vars.GCE_METADATA_HOST]
  211. importlib.reload(_metadata)
  212. request.assert_called_once_with(
  213. method="GET",
  214. url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH),
  215. headers=_metadata._METADATA_HEADERS,
  216. )
  217. def _test_get_success_custom_root_old_variable():
  218. request = make_request("{}", headers={"content-type": "application/json"})
  219. fake_root = "another.metadata.service"
  220. os.environ[environment_vars.GCE_METADATA_ROOT] = fake_root
  221. importlib.reload(_metadata)
  222. try:
  223. _metadata.get(request, PATH)
  224. finally:
  225. del os.environ[environment_vars.GCE_METADATA_ROOT]
  226. importlib.reload(_metadata)
  227. request.assert_called_once_with(
  228. method="GET",
  229. url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH),
  230. headers=_metadata._METADATA_HEADERS,
  231. )
  232. def test_get_failure():
  233. request = make_request("Metadata error", status=http_client.NOT_FOUND)
  234. with pytest.raises(exceptions.TransportError) as excinfo:
  235. _metadata.get(request, PATH)
  236. assert excinfo.match(r"Metadata error")
  237. request.assert_called_once_with(
  238. method="GET",
  239. url=_metadata._METADATA_ROOT + PATH,
  240. headers=_metadata._METADATA_HEADERS,
  241. )
  242. def test_get_return_none_for_not_found_error():
  243. request = make_request("Metadata error", status=http_client.NOT_FOUND)
  244. assert _metadata.get(request, PATH, return_none_for_not_found_error=True) is None
  245. request.assert_called_once_with(
  246. method="GET",
  247. url=_metadata._METADATA_ROOT + PATH,
  248. headers=_metadata._METADATA_HEADERS,
  249. )
  250. def test_get_failure_connection_failed():
  251. request = make_request("")
  252. request.side_effect = exceptions.TransportError()
  253. with pytest.raises(exceptions.TransportError) as excinfo:
  254. _metadata.get(request, PATH)
  255. assert excinfo.match(r"Compute Engine Metadata server unavailable")
  256. request.assert_called_with(
  257. method="GET",
  258. url=_metadata._METADATA_ROOT + PATH,
  259. headers=_metadata._METADATA_HEADERS,
  260. )
  261. assert request.call_count == 5
  262. def test_get_failure_bad_json():
  263. request = make_request("{", headers={"content-type": "application/json"})
  264. with pytest.raises(exceptions.TransportError) as excinfo:
  265. _metadata.get(request, PATH)
  266. assert excinfo.match(r"invalid JSON")
  267. request.assert_called_once_with(
  268. method="GET",
  269. url=_metadata._METADATA_ROOT + PATH,
  270. headers=_metadata._METADATA_HEADERS,
  271. )
  272. def test_get_project_id():
  273. project = "example-project"
  274. request = make_request(project, headers={"content-type": "text/plain"})
  275. project_id = _metadata.get_project_id(request)
  276. request.assert_called_once_with(
  277. method="GET",
  278. url=_metadata._METADATA_ROOT + "project/project-id",
  279. headers=_metadata._METADATA_HEADERS,
  280. )
  281. assert project_id == project
  282. def test_get_universe_domain_success():
  283. request = make_request(
  284. "fake_universe_domain", headers={"content-type": "text/plain"}
  285. )
  286. universe_domain = _metadata.get_universe_domain(request)
  287. request.assert_called_once_with(
  288. method="GET",
  289. url=_metadata._METADATA_ROOT + "universe/universe_domain",
  290. headers=_metadata._METADATA_HEADERS,
  291. )
  292. assert universe_domain == "fake_universe_domain"
  293. def test_get_universe_domain_not_found():
  294. # Test that if the universe domain endpoint returns 404 error, we should
  295. # use googleapis.com as the universe domain
  296. request = make_request("not found", status=http_client.NOT_FOUND)
  297. universe_domain = _metadata.get_universe_domain(request)
  298. request.assert_called_once_with(
  299. method="GET",
  300. url=_metadata._METADATA_ROOT + "universe/universe_domain",
  301. headers=_metadata._METADATA_HEADERS,
  302. )
  303. assert universe_domain == "googleapis.com"
  304. def test_get_universe_domain_other_error():
  305. # Test that if the universe domain endpoint returns an error other than 404
  306. # we should throw the error
  307. request = make_request("unauthorized", status=http_client.UNAUTHORIZED)
  308. with pytest.raises(exceptions.TransportError) as excinfo:
  309. _metadata.get_universe_domain(request)
  310. assert excinfo.match(r"unauthorized")
  311. request.assert_called_once_with(
  312. method="GET",
  313. url=_metadata._METADATA_ROOT + "universe/universe_domain",
  314. headers=_metadata._METADATA_HEADERS,
  315. )
  316. @mock.patch(
  317. "google.auth.metrics.token_request_access_token_mds",
  318. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  319. )
  320. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  321. def test_get_service_account_token(utcnow, mock_metrics_header_value):
  322. ttl = 500
  323. request = make_request(
  324. json.dumps({"access_token": "token", "expires_in": ttl}),
  325. headers={"content-type": "application/json"},
  326. )
  327. token, expiry = _metadata.get_service_account_token(request)
  328. request.assert_called_once_with(
  329. method="GET",
  330. url=_metadata._METADATA_ROOT + PATH + "/token",
  331. headers={
  332. "metadata-flavor": "Google",
  333. "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  334. },
  335. )
  336. assert token == "token"
  337. assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
  338. @mock.patch(
  339. "google.auth.metrics.token_request_access_token_mds",
  340. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  341. )
  342. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  343. def test_get_service_account_token_with_scopes_list(utcnow, mock_metrics_header_value):
  344. ttl = 500
  345. request = make_request(
  346. json.dumps({"access_token": "token", "expires_in": ttl}),
  347. headers={"content-type": "application/json"},
  348. )
  349. token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"])
  350. request.assert_called_once_with(
  351. method="GET",
  352. url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
  353. headers={
  354. "metadata-flavor": "Google",
  355. "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  356. },
  357. )
  358. assert token == "token"
  359. assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
  360. @mock.patch(
  361. "google.auth.metrics.token_request_access_token_mds",
  362. return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  363. )
  364. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  365. def test_get_service_account_token_with_scopes_string(
  366. utcnow, mock_metrics_header_value
  367. ):
  368. ttl = 500
  369. request = make_request(
  370. json.dumps({"access_token": "token", "expires_in": ttl}),
  371. headers={"content-type": "application/json"},
  372. )
  373. token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar")
  374. request.assert_called_once_with(
  375. method="GET",
  376. url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
  377. headers={
  378. "metadata-flavor": "Google",
  379. "x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
  380. },
  381. )
  382. assert token == "token"
  383. assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
  384. def test_get_service_account_info():
  385. key, value = "foo", "bar"
  386. request = make_request(
  387. json.dumps({key: value}), headers={"content-type": "application/json"}
  388. )
  389. info = _metadata.get_service_account_info(request)
  390. request.assert_called_once_with(
  391. method="GET",
  392. url=_metadata._METADATA_ROOT + PATH + "/?recursive=true",
  393. headers=_metadata._METADATA_HEADERS,
  394. )
  395. assert info[key] == value