test__mtls_helper.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import re
  16. import mock
  17. from OpenSSL import crypto
  18. import pytest # type: ignore
  19. from google.auth import exceptions
  20. from google.auth.transport import _mtls_helper
  21. CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]}
  22. ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
  23. MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw
  24. DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT
  25. uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts
  26. wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB
  27. saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU
  28. -----END ENCRYPTED PRIVATE KEY-----"""
  29. EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY-----
  30. MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/
  31. brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw==
  32. -----END PUBLIC KEY-----"""
  33. PASSPHRASE = b"""-----BEGIN PASSPHRASE-----
  34. password
  35. -----END PASSPHRASE-----"""
  36. PASSPHRASE_VALUE = b"password"
  37. def check_cert_and_key(content, expected_cert, expected_key):
  38. success = True
  39. cert_match = re.findall(_mtls_helper._CERT_REGEX, content)
  40. success = success and len(cert_match) == 1 and cert_match[0] == expected_cert
  41. key_match = re.findall(_mtls_helper._KEY_REGEX, content)
  42. success = success and len(key_match) == 1 and key_match[0] == expected_key
  43. return success
  44. class TestCertAndKeyRegex(object):
  45. def test_cert_and_key(self):
  46. # Test single cert and single key
  47. check_cert_and_key(
  48. pytest.public_cert_bytes + pytest.private_key_bytes,
  49. pytest.public_cert_bytes,
  50. pytest.private_key_bytes,
  51. )
  52. check_cert_and_key(
  53. pytest.private_key_bytes + pytest.public_cert_bytes,
  54. pytest.public_cert_bytes,
  55. pytest.private_key_bytes,
  56. )
  57. # Test cert chain and single key
  58. check_cert_and_key(
  59. pytest.public_cert_bytes
  60. + pytest.public_cert_bytes
  61. + pytest.private_key_bytes,
  62. pytest.public_cert_bytes + pytest.public_cert_bytes,
  63. pytest.private_key_bytes,
  64. )
  65. check_cert_and_key(
  66. pytest.private_key_bytes
  67. + pytest.public_cert_bytes
  68. + pytest.public_cert_bytes,
  69. pytest.public_cert_bytes + pytest.public_cert_bytes,
  70. pytest.private_key_bytes,
  71. )
  72. def test_key(self):
  73. # Create some fake keys for regex check.
  74. KEY = b"""-----BEGIN PRIVATE KEY-----
  75. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  76. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  77. -----END PRIVATE KEY-----"""
  78. RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
  79. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  80. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  81. -----END RSA PRIVATE KEY-----"""
  82. EC_KEY = b"""-----BEGIN EC PRIVATE KEY-----
  83. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  84. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  85. -----END EC PRIVATE KEY-----"""
  86. check_cert_and_key(
  87. pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY
  88. )
  89. check_cert_and_key(
  90. pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY
  91. )
  92. check_cert_and_key(
  93. pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY
  94. )
  95. class TestCheckaMetadataPath(object):
  96. def test_success(self):
  97. metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
  98. returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
  99. assert returned_path is not None
  100. def test_failure(self):
  101. metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
  102. returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
  103. assert returned_path is None
  104. class TestReadMetadataFile(object):
  105. def test_success(self):
  106. metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
  107. metadata = _mtls_helper._read_dca_metadata_file(metadata_path)
  108. assert "cert_provider_command" in metadata
  109. def test_file_not_json(self):
  110. # read a file which is not json format.
  111. metadata_path = os.path.join(pytest.data_dir, "privatekey.pem")
  112. with pytest.raises(exceptions.ClientCertError):
  113. _mtls_helper._read_dca_metadata_file(metadata_path)
  114. class TestRunCertProviderCommand(object):
  115. def create_mock_process(self, output, error):
  116. # There are two steps to execute a script with subprocess.Popen.
  117. # (1) process = subprocess.Popen([comannds])
  118. # (2) stdout, stderr = process.communicate()
  119. # This function creates a mock process which can be returned by a mock
  120. # subprocess.Popen. The mock process returns the given output and error
  121. # when mock_process.communicate() is called.
  122. mock_process = mock.Mock()
  123. attrs = {"communicate.return_value": (output, error), "returncode": 0}
  124. mock_process.configure_mock(**attrs)
  125. return mock_process
  126. @mock.patch("subprocess.Popen", autospec=True)
  127. def test_success(self, mock_popen):
  128. mock_popen.return_value = self.create_mock_process(
  129. pytest.public_cert_bytes + pytest.private_key_bytes, b""
  130. )
  131. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  132. assert cert == pytest.public_cert_bytes
  133. assert key == pytest.private_key_bytes
  134. assert passphrase is None
  135. mock_popen.return_value = self.create_mock_process(
  136. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  137. )
  138. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  139. ["command"], expect_encrypted_key=True
  140. )
  141. assert cert == pytest.public_cert_bytes
  142. assert key == ENCRYPTED_EC_PRIVATE_KEY
  143. assert passphrase == PASSPHRASE_VALUE
  144. @mock.patch("subprocess.Popen", autospec=True)
  145. def test_success_with_cert_chain(self, mock_popen):
  146. PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes
  147. mock_popen.return_value = self.create_mock_process(
  148. PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b""
  149. )
  150. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  151. assert cert == PUBLIC_CERT_CHAIN_BYTES
  152. assert key == pytest.private_key_bytes
  153. assert passphrase is None
  154. mock_popen.return_value = self.create_mock_process(
  155. PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  156. )
  157. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  158. ["command"], expect_encrypted_key=True
  159. )
  160. assert cert == PUBLIC_CERT_CHAIN_BYTES
  161. assert key == ENCRYPTED_EC_PRIVATE_KEY
  162. assert passphrase == PASSPHRASE_VALUE
  163. @mock.patch("subprocess.Popen", autospec=True)
  164. def test_missing_cert(self, mock_popen):
  165. mock_popen.return_value = self.create_mock_process(
  166. pytest.private_key_bytes, b""
  167. )
  168. with pytest.raises(exceptions.ClientCertError):
  169. _mtls_helper._run_cert_provider_command(["command"])
  170. mock_popen.return_value = self.create_mock_process(
  171. ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  172. )
  173. with pytest.raises(exceptions.ClientCertError):
  174. _mtls_helper._run_cert_provider_command(
  175. ["command"], expect_encrypted_key=True
  176. )
  177. @mock.patch("subprocess.Popen", autospec=True)
  178. def test_missing_key(self, mock_popen):
  179. mock_popen.return_value = self.create_mock_process(
  180. pytest.public_cert_bytes, b""
  181. )
  182. with pytest.raises(exceptions.ClientCertError):
  183. _mtls_helper._run_cert_provider_command(["command"])
  184. mock_popen.return_value = self.create_mock_process(
  185. pytest.public_cert_bytes + PASSPHRASE, b""
  186. )
  187. with pytest.raises(exceptions.ClientCertError):
  188. _mtls_helper._run_cert_provider_command(
  189. ["command"], expect_encrypted_key=True
  190. )
  191. @mock.patch("subprocess.Popen", autospec=True)
  192. def test_missing_passphrase(self, mock_popen):
  193. mock_popen.return_value = self.create_mock_process(
  194. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  195. )
  196. with pytest.raises(exceptions.ClientCertError):
  197. _mtls_helper._run_cert_provider_command(
  198. ["command"], expect_encrypted_key=True
  199. )
  200. @mock.patch("subprocess.Popen", autospec=True)
  201. def test_passphrase_not_expected(self, mock_popen):
  202. mock_popen.return_value = self.create_mock_process(
  203. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  204. )
  205. with pytest.raises(exceptions.ClientCertError):
  206. _mtls_helper._run_cert_provider_command(["command"])
  207. @mock.patch("subprocess.Popen", autospec=True)
  208. def test_encrypted_key_expected(self, mock_popen):
  209. mock_popen.return_value = self.create_mock_process(
  210. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  211. )
  212. with pytest.raises(exceptions.ClientCertError):
  213. _mtls_helper._run_cert_provider_command(
  214. ["command"], expect_encrypted_key=True
  215. )
  216. @mock.patch("subprocess.Popen", autospec=True)
  217. def test_unencrypted_key_expected(self, mock_popen):
  218. mock_popen.return_value = self.create_mock_process(
  219. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  220. )
  221. with pytest.raises(exceptions.ClientCertError):
  222. _mtls_helper._run_cert_provider_command(["command"])
  223. @mock.patch("subprocess.Popen", autospec=True)
  224. def test_cert_provider_returns_error(self, mock_popen):
  225. mock_popen.return_value = self.create_mock_process(b"", b"some error")
  226. mock_popen.return_value.returncode = 1
  227. with pytest.raises(exceptions.ClientCertError):
  228. _mtls_helper._run_cert_provider_command(["command"])
  229. @mock.patch("subprocess.Popen", autospec=True)
  230. def test_popen_raise_exception(self, mock_popen):
  231. mock_popen.side_effect = OSError()
  232. with pytest.raises(exceptions.ClientCertError):
  233. _mtls_helper._run_cert_provider_command(["command"])
  234. class TestGetClientSslCredentials(object):
  235. @mock.patch(
  236. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  237. )
  238. @mock.patch(
  239. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  240. )
  241. @mock.patch(
  242. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  243. )
  244. def test_success(
  245. self,
  246. mock_check_dca_metadata_path,
  247. mock_read_dca_metadata_file,
  248. mock_run_cert_provider_command,
  249. ):
  250. mock_check_dca_metadata_path.return_value = True
  251. mock_read_dca_metadata_file.return_value = {
  252. "cert_provider_command": ["command"]
  253. }
  254. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  255. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  256. assert has_cert
  257. assert cert == b"cert"
  258. assert key == b"key"
  259. assert passphrase is None
  260. @mock.patch(
  261. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  262. )
  263. def test_success_without_metadata(self, mock_check_dca_metadata_path):
  264. mock_check_dca_metadata_path.return_value = False
  265. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  266. assert not has_cert
  267. assert cert is None
  268. assert key is None
  269. assert passphrase is None
  270. @mock.patch(
  271. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  272. )
  273. @mock.patch(
  274. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  275. )
  276. @mock.patch(
  277. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  278. )
  279. def test_success_with_encrypted_key(
  280. self,
  281. mock_check_dca_metadata_path,
  282. mock_read_dca_metadata_file,
  283. mock_run_cert_provider_command,
  284. ):
  285. mock_check_dca_metadata_path.return_value = True
  286. mock_read_dca_metadata_file.return_value = {
  287. "cert_provider_command": ["command"]
  288. }
  289. mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
  290. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  291. generate_encrypted_key=True
  292. )
  293. assert has_cert
  294. assert cert == b"cert"
  295. assert key == b"key"
  296. assert passphrase == b"passphrase"
  297. mock_run_cert_provider_command.assert_called_once_with(
  298. ["command", "--with_passphrase"], expect_encrypted_key=True
  299. )
  300. @mock.patch(
  301. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  302. )
  303. @mock.patch(
  304. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  305. )
  306. def test_missing_cert_command(
  307. self, mock_check_dca_metadata_path, mock_read_dca_metadata_file
  308. ):
  309. mock_check_dca_metadata_path.return_value = True
  310. mock_read_dca_metadata_file.return_value = {}
  311. with pytest.raises(exceptions.ClientCertError):
  312. _mtls_helper.get_client_ssl_credentials()
  313. @mock.patch(
  314. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  315. )
  316. @mock.patch(
  317. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  318. )
  319. @mock.patch(
  320. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  321. )
  322. def test_customize_context_aware_metadata_path(
  323. self,
  324. mock_check_dca_metadata_path,
  325. mock_read_dca_metadata_file,
  326. mock_run_cert_provider_command,
  327. ):
  328. context_aware_metadata_path = "/path/to/metata/data"
  329. mock_check_dca_metadata_path.return_value = context_aware_metadata_path
  330. mock_read_dca_metadata_file.return_value = {
  331. "cert_provider_command": ["command"]
  332. }
  333. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  334. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  335. context_aware_metadata_path=context_aware_metadata_path
  336. )
  337. assert has_cert
  338. assert cert == b"cert"
  339. assert key == b"key"
  340. assert passphrase is None
  341. mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
  342. mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path)
  343. class TestGetClientCertAndKey(object):
  344. def test_callback_success(self):
  345. callback = mock.Mock()
  346. callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)
  347. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback)
  348. assert found_cert_key
  349. assert cert == pytest.public_cert_bytes
  350. assert key == pytest.private_key_bytes
  351. @mock.patch(
  352. "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True
  353. )
  354. def test_use_metadata(self, mock_get_client_ssl_credentials):
  355. mock_get_client_ssl_credentials.return_value = (
  356. True,
  357. pytest.public_cert_bytes,
  358. pytest.private_key_bytes,
  359. None,
  360. )
  361. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key()
  362. assert found_cert_key
  363. assert cert == pytest.public_cert_bytes
  364. assert key == pytest.private_key_bytes
  365. class TestDecryptPrivateKey(object):
  366. def test_success(self):
  367. decrypted_key = _mtls_helper.decrypt_private_key(
  368. ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE
  369. )
  370. private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key)
  371. public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY)
  372. x509 = crypto.X509()
  373. x509.set_pubkey(public_key)
  374. # Test the decrypted key works by signing and verification.
  375. signature = crypto.sign(private_key, b"data", "sha256")
  376. crypto.verify(x509, signature, b"data", "sha256")
  377. def test_crypto_error(self):
  378. with pytest.raises(crypto.Error):
  379. _mtls_helper.decrypt_private_key(
  380. ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password"
  381. )