test__mtls_helper.py 26 KB


  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import re
  16. import mock
  17. from OpenSSL import crypto
  18. import pytest # type: ignore
  19. from google.auth import exceptions
  20. from google.auth.transport import _mtls_helper
  21. CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]}
  22. ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
  23. MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw
  24. DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT
  25. uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts
  26. wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB
  27. saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU
  28. -----END ENCRYPTED PRIVATE KEY-----"""
  29. EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY-----
  30. MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/
  31. brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw==
  32. -----END PUBLIC KEY-----"""
  33. PASSPHRASE = b"""-----BEGIN PASSPHRASE-----
  34. password
  35. -----END PASSPHRASE-----"""
  36. PASSPHRASE_VALUE = b"password"
  37. def check_cert_and_key(content, expected_cert, expected_key):
  38. success = True
  39. cert_match = re.findall(_mtls_helper._CERT_REGEX, content)
  40. success = success and len(cert_match) == 1 and cert_match[0] == expected_cert
  41. key_match = re.findall(_mtls_helper._KEY_REGEX, content)
  42. success = success and len(key_match) == 1 and key_match[0] == expected_key
  43. return success
  44. class TestCertAndKeyRegex(object):
  45. def test_cert_and_key(self):
  46. # Test single cert and single key
  47. check_cert_and_key(
  48. pytest.public_cert_bytes + pytest.private_key_bytes,
  49. pytest.public_cert_bytes,
  50. pytest.private_key_bytes,
  51. )
  52. check_cert_and_key(
  53. pytest.private_key_bytes + pytest.public_cert_bytes,
  54. pytest.public_cert_bytes,
  55. pytest.private_key_bytes,
  56. )
  57. # Test cert chain and single key
  58. check_cert_and_key(
  59. pytest.public_cert_bytes
  60. + pytest.public_cert_bytes
  61. + pytest.private_key_bytes,
  62. pytest.public_cert_bytes + pytest.public_cert_bytes,
  63. pytest.private_key_bytes,
  64. )
  65. check_cert_and_key(
  66. pytest.private_key_bytes
  67. + pytest.public_cert_bytes
  68. + pytest.public_cert_bytes,
  69. pytest.public_cert_bytes + pytest.public_cert_bytes,
  70. pytest.private_key_bytes,
  71. )
  72. def test_key(self):
  73. # Create some fake keys for regex check.
  74. KEY = b"""-----BEGIN PRIVATE KEY-----
  75. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  76. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  77. -----END PRIVATE KEY-----"""
  78. RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
  79. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  80. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  81. -----END RSA PRIVATE KEY-----"""
  82. EC_KEY = b"""-----BEGIN EC PRIVATE KEY-----
  83. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  84. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  85. -----END EC PRIVATE KEY-----"""
  86. check_cert_and_key(
  87. pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY
  88. )
  89. check_cert_and_key(
  90. pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY
  91. )
  92. check_cert_and_key(
  93. pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY
  94. )
  95. class TestCheckConfigPath(object):
  96. def test_success(self):
  97. metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
  98. returned_path = _mtls_helper._check_config_path(metadata_path)
  99. assert returned_path is not None
  100. def test_failure(self):
  101. metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
  102. returned_path = _mtls_helper._check_config_path(metadata_path)
  103. assert returned_path is None
  104. class TestReadMetadataFile(object):
  105. def test_success(self):
  106. metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
  107. metadata = _mtls_helper._load_json_file(metadata_path)
  108. assert "cert_provider_command" in metadata
  109. def test_file_not_json(self):
  110. # read a file which is not json format.
  111. metadata_path = os.path.join(pytest.data_dir, "privatekey.pem")
  112. with pytest.raises(exceptions.ClientCertError):
  113. _mtls_helper._load_json_file(metadata_path)
  114. class TestRunCertProviderCommand(object):
  115. def create_mock_process(self, output, error):
  116. # There are two steps to execute a script with subprocess.Popen.
  117. # (1) process = subprocess.Popen([comannds])
  118. # (2) stdout, stderr = process.communicate()
  119. # This function creates a mock process which can be returned by a mock
  120. # subprocess.Popen. The mock process returns the given output and error
  121. # when mock_process.communicate() is called.
  122. mock_process = mock.Mock()
  123. attrs = {"communicate.return_value": (output, error), "returncode": 0}
  124. mock_process.configure_mock(**attrs)
  125. return mock_process
  126. @mock.patch("subprocess.Popen", autospec=True)
  127. def test_success(self, mock_popen):
  128. mock_popen.return_value = self.create_mock_process(
  129. pytest.public_cert_bytes + pytest.private_key_bytes, b""
  130. )
  131. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  132. assert cert == pytest.public_cert_bytes
  133. assert key == pytest.private_key_bytes
  134. assert passphrase is None
  135. mock_popen.return_value = self.create_mock_process(
  136. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  137. )
  138. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  139. ["command"], expect_encrypted_key=True
  140. )
  141. assert cert == pytest.public_cert_bytes
  142. assert key == ENCRYPTED_EC_PRIVATE_KEY
  143. assert passphrase == PASSPHRASE_VALUE
  144. @mock.patch("subprocess.Popen", autospec=True)
  145. def test_success_with_cert_chain(self, mock_popen):
  146. PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes
  147. mock_popen.return_value = self.create_mock_process(
  148. PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b""
  149. )
  150. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  151. assert cert == PUBLIC_CERT_CHAIN_BYTES
  152. assert key == pytest.private_key_bytes
  153. assert passphrase is None
  154. mock_popen.return_value = self.create_mock_process(
  155. PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  156. )
  157. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  158. ["command"], expect_encrypted_key=True
  159. )
  160. assert cert == PUBLIC_CERT_CHAIN_BYTES
  161. assert key == ENCRYPTED_EC_PRIVATE_KEY
  162. assert passphrase == PASSPHRASE_VALUE
  163. @mock.patch("subprocess.Popen", autospec=True)
  164. def test_missing_cert(self, mock_popen):
  165. mock_popen.return_value = self.create_mock_process(
  166. pytest.private_key_bytes, b""
  167. )
  168. with pytest.raises(exceptions.ClientCertError):
  169. _mtls_helper._run_cert_provider_command(["command"])
  170. mock_popen.return_value = self.create_mock_process(
  171. ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  172. )
  173. with pytest.raises(exceptions.ClientCertError):
  174. _mtls_helper._run_cert_provider_command(
  175. ["command"], expect_encrypted_key=True
  176. )
  177. @mock.patch("subprocess.Popen", autospec=True)
  178. def test_missing_key(self, mock_popen):
  179. mock_popen.return_value = self.create_mock_process(
  180. pytest.public_cert_bytes, b""
  181. )
  182. with pytest.raises(exceptions.ClientCertError):
  183. _mtls_helper._run_cert_provider_command(["command"])
  184. mock_popen.return_value = self.create_mock_process(
  185. pytest.public_cert_bytes + PASSPHRASE, b""
  186. )
  187. with pytest.raises(exceptions.ClientCertError):
  188. _mtls_helper._run_cert_provider_command(
  189. ["command"], expect_encrypted_key=True
  190. )
  191. @mock.patch("subprocess.Popen", autospec=True)
  192. def test_missing_passphrase(self, mock_popen):
  193. mock_popen.return_value = self.create_mock_process(
  194. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  195. )
  196. with pytest.raises(exceptions.ClientCertError):
  197. _mtls_helper._run_cert_provider_command(
  198. ["command"], expect_encrypted_key=True
  199. )
  200. @mock.patch("subprocess.Popen", autospec=True)
  201. def test_passphrase_not_expected(self, mock_popen):
  202. mock_popen.return_value = self.create_mock_process(
  203. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  204. )
  205. with pytest.raises(exceptions.ClientCertError):
  206. _mtls_helper._run_cert_provider_command(["command"])
  207. @mock.patch("subprocess.Popen", autospec=True)
  208. def test_encrypted_key_expected(self, mock_popen):
  209. mock_popen.return_value = self.create_mock_process(
  210. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  211. )
  212. with pytest.raises(exceptions.ClientCertError):
  213. _mtls_helper._run_cert_provider_command(
  214. ["command"], expect_encrypted_key=True
  215. )
  216. @mock.patch("subprocess.Popen", autospec=True)
  217. def test_unencrypted_key_expected(self, mock_popen):
  218. mock_popen.return_value = self.create_mock_process(
  219. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  220. )
  221. with pytest.raises(exceptions.ClientCertError):
  222. _mtls_helper._run_cert_provider_command(["command"])
  223. @mock.patch("subprocess.Popen", autospec=True)
  224. def test_cert_provider_returns_error(self, mock_popen):
  225. mock_popen.return_value = self.create_mock_process(b"", b"some error")
  226. mock_popen.return_value.returncode = 1
  227. with pytest.raises(exceptions.ClientCertError):
  228. _mtls_helper._run_cert_provider_command(["command"])
  229. @mock.patch("subprocess.Popen", autospec=True)
  230. def test_popen_raise_exception(self, mock_popen):
  231. mock_popen.side_effect = OSError()
  232. with pytest.raises(exceptions.ClientCertError):
  233. _mtls_helper._run_cert_provider_command(["command"])
  234. class TestGetClientSslCredentials(object):
  235. @mock.patch(
  236. "google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
  237. )
  238. @mock.patch(
  239. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  240. )
  241. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  242. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  243. def test_success_with_context_aware_metadata(
  244. self,
  245. mock_check_config_path,
  246. mock_load_json_file,
  247. mock_run_cert_provider_command,
  248. mock_get_workload_cert_and_key,
  249. ):
  250. mock_check_config_path.return_value = "/path/to/config"
  251. mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
  252. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  253. mock_get_workload_cert_and_key.return_value = (None, None)
  254. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  255. assert has_cert
  256. assert cert == b"cert"
  257. assert key == b"key"
  258. assert passphrase is None
  259. @mock.patch(
  260. "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True
  261. )
  262. @mock.patch(
  263. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  264. )
  265. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  266. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  267. def test_success_with_certificate_config(
  268. self,
  269. mock_check_config_path,
  270. mock_load_json_file,
  271. mock_get_cert_config_path,
  272. mock_read_cert_and_key_files,
  273. ):
  274. cert_config_path = "/path/to/config"
  275. mock_check_config_path.return_value = cert_config_path
  276. mock_load_json_file.return_value = {
  277. "cert_configs": {
  278. "workload": {"cert_path": "cert/path", "key_path": "key/path"}
  279. }
  280. }
  281. mock_get_cert_config_path.return_value = cert_config_path
  282. mock_read_cert_and_key_files.return_value = (
  283. pytest.public_cert_bytes,
  284. pytest.private_key_bytes,
  285. )
  286. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  287. assert has_cert
  288. assert cert == pytest.public_cert_bytes
  289. assert key == pytest.private_key_bytes
  290. assert passphrase is None
  291. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  292. def test_success_without_metadata(self, mock_check_config_path):
  293. mock_check_config_path.return_value = False
  294. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  295. assert not has_cert
  296. assert cert is None
  297. assert key is None
  298. assert passphrase is None
  299. @mock.patch(
  300. "google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
  301. )
  302. @mock.patch(
  303. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  304. )
  305. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  306. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  307. def test_success_with_encrypted_key(
  308. self,
  309. mock_check_config_path,
  310. mock_load_json_file,
  311. mock_run_cert_provider_command,
  312. mock_get_workload_cert_and_key,
  313. ):
  314. mock_check_config_path.return_value = "/path/to/config"
  315. mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
  316. mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
  317. mock_get_workload_cert_and_key.return_value = (None, None)
  318. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  319. generate_encrypted_key=True
  320. )
  321. assert has_cert
  322. assert cert == b"cert"
  323. assert key == b"key"
  324. assert passphrase == b"passphrase"
  325. mock_run_cert_provider_command.assert_called_once_with(
  326. ["command", "--with_passphrase"], expect_encrypted_key=True
  327. )
  328. @mock.patch(
  329. "google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
  330. )
  331. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  332. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  333. def test_missing_cert_command(
  334. self,
  335. mock_check_config_path,
  336. mock_load_json_file,
  337. mock_get_workload_cert_and_key,
  338. ):
  339. mock_check_config_path.return_value = "/path/to/config"
  340. mock_load_json_file.return_value = {}
  341. mock_get_workload_cert_and_key.return_value = (None, None)
  342. with pytest.raises(exceptions.ClientCertError):
  343. _mtls_helper.get_client_ssl_credentials()
  344. @mock.patch(
  345. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  346. )
  347. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  348. @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
  349. def test_customize_context_aware_metadata_path(
  350. self,
  351. mock_check_config_path,
  352. mock_load_json_file,
  353. mock_run_cert_provider_command,
  354. ):
  355. context_aware_metadata_path = "/path/to/metata/data"
  356. mock_check_config_path.return_value = context_aware_metadata_path
  357. mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
  358. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  359. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  360. context_aware_metadata_path=context_aware_metadata_path
  361. )
  362. assert has_cert
  363. assert cert == b"cert"
  364. assert key == b"key"
  365. assert passphrase is None
  366. mock_check_config_path.assert_called_with(context_aware_metadata_path)
  367. mock_load_json_file.assert_called_with(context_aware_metadata_path)
  368. class TestGetWorkloadCertAndKey(object):
  369. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  370. @mock.patch(
  371. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  372. )
  373. @mock.patch(
  374. "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True
  375. )
  376. def test_success(
  377. self,
  378. mock_read_cert_and_key_files,
  379. mock_get_cert_config_path,
  380. mock_load_json_file,
  381. ):
  382. cert_config_path = "/path/to/cert"
  383. mock_get_cert_config_path.return_value = "/path/to/cert"
  384. mock_load_json_file.return_value = {
  385. "cert_configs": {
  386. "workload": {"cert_path": "cert/path", "key_path": "key/path"}
  387. }
  388. }
  389. mock_read_cert_and_key_files.return_value = (
  390. pytest.public_cert_bytes,
  391. pytest.private_key_bytes,
  392. )
  393. actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key(
  394. cert_config_path
  395. )
  396. assert actual_cert == pytest.public_cert_bytes
  397. assert actual_key == pytest.private_key_bytes
  398. @mock.patch(
  399. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  400. )
  401. def test_file_not_found_returns_none(self, mock_get_cert_config_path):
  402. mock_get_cert_config_path.return_value = None
  403. actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key()
  404. assert actual_cert is None
  405. assert actual_key is None
  406. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  407. @mock.patch(
  408. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  409. )
  410. def test_no_cert_configs(self, mock_get_cert_config_path, mock_load_json_file):
  411. mock_get_cert_config_path.return_value = "/path/to/cert"
  412. mock_load_json_file.return_value = {}
  413. with pytest.raises(exceptions.ClientCertError):
  414. _mtls_helper._get_workload_cert_and_key("")
  415. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  416. @mock.patch(
  417. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  418. )
  419. def test_no_workload(self, mock_get_cert_config_path, mock_load_json_file):
  420. mock_get_cert_config_path.return_value = "/path/to/cert"
  421. mock_load_json_file.return_value = {"cert_configs": {}}
  422. with pytest.raises(exceptions.ClientCertError):
  423. _mtls_helper._get_workload_cert_and_key("")
  424. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  425. @mock.patch(
  426. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  427. )
  428. def test_no_cert_file(self, mock_get_cert_config_path, mock_load_json_file):
  429. mock_get_cert_config_path.return_value = "/path/to/cert"
  430. mock_load_json_file.return_value = {
  431. "cert_configs": {"workload": {"key_path": "path/to/key"}}
  432. }
  433. with pytest.raises(exceptions.ClientCertError):
  434. _mtls_helper._get_workload_cert_and_key("")
  435. @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
  436. @mock.patch(
  437. "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
  438. )
  439. def test_no_key_file(self, mock_get_cert_config_path, mock_load_json_file):
  440. mock_get_cert_config_path.return_value = "/path/to/cert"
  441. mock_load_json_file.return_value = {
  442. "cert_configs": {"workload": {"cert_path": "path/to/key"}}
  443. }
  444. with pytest.raises(exceptions.ClientCertError):
  445. _mtls_helper._get_workload_cert_and_key("")
  446. class TestReadCertAndKeyFile(object):
  447. def test_success(self):
  448. cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
  449. key_path = os.path.join(pytest.data_dir, "privatekey.pem")
  450. actual_cert, actual_key = _mtls_helper._read_cert_and_key_files(
  451. cert_path, key_path
  452. )
  453. assert actual_cert == pytest.public_cert_bytes
  454. assert actual_key == pytest.private_key_bytes
  455. def test_no_cert_file(self):
  456. cert_path = "fake/file/path"
  457. key_path = os.path.join(pytest.data_dir, "privatekey.pem")
  458. with pytest.raises(FileNotFoundError):
  459. _mtls_helper._read_cert_and_key_files(cert_path, key_path)
  460. def test_no_key_file(self):
  461. cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
  462. key_path = "fake/file/path"
  463. with pytest.raises(FileNotFoundError):
  464. _mtls_helper._read_cert_and_key_files(cert_path, key_path)
  465. def test_invalid_cert_file(self):
  466. cert_path = os.path.join(pytest.data_dir, "service_account.json")
  467. key_path = os.path.join(pytest.data_dir, "privatekey.pem")
  468. with pytest.raises(exceptions.ClientCertError):
  469. _mtls_helper._read_cert_and_key_files(cert_path, key_path)
  470. def test_invalid_key_file(self):
  471. cert_path = os.path.join(pytest.data_dir, "public_cert.pem")
  472. key_path = os.path.join(pytest.data_dir, "public_cert.pem")
  473. with pytest.raises(exceptions.ClientCertError):
  474. _mtls_helper._read_cert_and_key_files(cert_path, key_path)
  475. class TestGetCertConfigPath(object):
  476. def test_success_with_override(self):
  477. config_path = os.path.join(pytest.data_dir, "service_account.json")
  478. returned_path = _mtls_helper._get_cert_config_path(config_path)
  479. assert returned_path == config_path
  480. def test_override_does_not_exist(self):
  481. config_path = "fake/file/path"
  482. returned_path = _mtls_helper._get_cert_config_path(config_path)
  483. assert returned_path is None
  484. @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""})
  485. @mock.patch("os.path.exists", autospec=True)
  486. def test_default(self, mock_path_exists):
  487. mock_path_exists.return_value = True
  488. returned_path = _mtls_helper._get_cert_config_path()
  489. expected_path = os.path.expanduser(
  490. _mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH
  491. )
  492. assert returned_path == expected_path
  493. @mock.patch.dict(
  494. os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
  495. )
  496. @mock.patch("os.path.exists", autospec=True)
  497. def test_env_variable(self, mock_path_exists):
  498. mock_path_exists.return_value = True
  499. returned_path = _mtls_helper._get_cert_config_path()
  500. expected_path = "path/to/config/file"
  501. assert returned_path == expected_path
  502. @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""})
  503. @mock.patch("os.path.exists", autospec=True)
  504. def test_env_variable_file_does_not_exist(self, mock_path_exists):
  505. mock_path_exists.return_value = False
  506. returned_path = _mtls_helper._get_cert_config_path()
  507. assert returned_path is None
  508. @mock.patch.dict(
  509. os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"}
  510. )
  511. @mock.patch("os.path.exists", autospec=True)
  512. def test_default_file_does_not_exist(self, mock_path_exists):
  513. mock_path_exists.return_value = False
  514. returned_path = _mtls_helper._get_cert_config_path()
  515. assert returned_path is None
  516. class TestGetClientCertAndKey(object):
  517. def test_callback_success(self):
  518. callback = mock.Mock()
  519. callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)
  520. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback)
  521. assert found_cert_key
  522. assert cert == pytest.public_cert_bytes
  523. assert key == pytest.private_key_bytes
  524. @mock.patch(
  525. "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True
  526. )
  527. def test_use_metadata(self, mock_get_client_ssl_credentials):
  528. mock_get_client_ssl_credentials.return_value = (
  529. True,
  530. pytest.public_cert_bytes,
  531. pytest.private_key_bytes,
  532. None,
  533. )
  534. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key()
  535. assert found_cert_key
  536. assert cert == pytest.public_cert_bytes
  537. assert key == pytest.private_key_bytes
  538. class TestDecryptPrivateKey(object):
  539. def test_success(self):
  540. decrypted_key = _mtls_helper.decrypt_private_key(
  541. ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE
  542. )
  543. private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key)
  544. public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY)
  545. x509 = crypto.X509()
  546. x509.set_pubkey(public_key)
  547. # Test the decrypted key works by signing and verification.
  548. signature = crypto.sign(private_key, b"data", "sha256")
  549. crypto.verify(x509, signature, b"data", "sha256")
  550. def test_crypto_error(self):
  551. with pytest.raises(crypto.Error):
  552. _mtls_helper.decrypt_private_key(
  553. ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password"
  554. )