test__mtls_helper.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. # Copyright 2020 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import re
  16. import mock
  17. from OpenSSL import crypto
  18. import pytest
  19. from google.auth import exceptions
  20. from google.auth.transport import _mtls_helper
  21. import yatest.common
  22. DATA_DIR = os.path.join(yatest.common.test_source_path(), "data")
  23. CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]}
  24. CONTEXT_AWARE_METADATA_NO_CERT_PROVIDER_COMMAND = {}
  25. ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
  26. MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw
  27. DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT
  28. uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts
  29. wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB
  30. saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU
  31. -----END ENCRYPTED PRIVATE KEY-----"""
  32. EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY-----
  33. MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/
  34. brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw==
  35. -----END PUBLIC KEY-----"""
  36. PASSPHRASE = b"""-----BEGIN PASSPHRASE-----
  37. password
  38. -----END PASSPHRASE-----"""
  39. PASSPHRASE_VALUE = b"password"
  40. def check_cert_and_key(content, expected_cert, expected_key):
  41. success = True
  42. cert_match = re.findall(_mtls_helper._CERT_REGEX, content)
  43. success = success and len(cert_match) == 1 and cert_match[0] == expected_cert
  44. key_match = re.findall(_mtls_helper._KEY_REGEX, content)
  45. success = success and len(key_match) == 1 and key_match[0] == expected_key
  46. return success
  47. class TestCertAndKeyRegex(object):
  48. def test_cert_and_key(self):
  49. # Test single cert and single key
  50. check_cert_and_key(
  51. pytest.public_cert_bytes + pytest.private_key_bytes,
  52. pytest.public_cert_bytes,
  53. pytest.private_key_bytes,
  54. )
  55. check_cert_and_key(
  56. pytest.private_key_bytes + pytest.public_cert_bytes,
  57. pytest.public_cert_bytes,
  58. pytest.private_key_bytes,
  59. )
  60. # Test cert chain and single key
  61. check_cert_and_key(
  62. pytest.public_cert_bytes
  63. + pytest.public_cert_bytes
  64. + pytest.private_key_bytes,
  65. pytest.public_cert_bytes + pytest.public_cert_bytes,
  66. pytest.private_key_bytes,
  67. )
  68. check_cert_and_key(
  69. pytest.private_key_bytes
  70. + pytest.public_cert_bytes
  71. + pytest.public_cert_bytes,
  72. pytest.public_cert_bytes + pytest.public_cert_bytes,
  73. pytest.private_key_bytes,
  74. )
  75. def test_key(self):
  76. # Create some fake keys for regex check.
  77. KEY = b"""-----BEGIN PRIVATE KEY-----
  78. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  79. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  80. -----END PRIVATE KEY-----"""
  81. RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY-----
  82. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  83. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  84. -----END RSA PRIVATE KEY-----"""
  85. EC_KEY = b"""-----BEGIN EC PRIVATE KEY-----
  86. MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg
  87. /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB
  88. -----END EC PRIVATE KEY-----"""
  89. check_cert_and_key(
  90. pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY
  91. )
  92. check_cert_and_key(
  93. pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY
  94. )
  95. check_cert_and_key(
  96. pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY
  97. )
  98. class TestCheckaMetadataPath(object):
  99. def test_success(self):
  100. metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json")
  101. returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
  102. assert returned_path is not None
  103. def test_failure(self):
  104. metadata_path = os.path.join(DATA_DIR, "not_exists.json")
  105. returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
  106. assert returned_path is None
  107. class TestReadMetadataFile(object):
  108. def test_success(self):
  109. metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json")
  110. metadata = _mtls_helper._read_dca_metadata_file(metadata_path)
  111. assert "cert_provider_command" in metadata
  112. def test_file_not_json(self):
  113. # read a file which is not json format.
  114. metadata_path = os.path.join(DATA_DIR, "privatekey.pem")
  115. with pytest.raises(exceptions.ClientCertError):
  116. _mtls_helper._read_dca_metadata_file(metadata_path)
  117. class TestRunCertProviderCommand(object):
  118. def create_mock_process(self, output, error):
  119. # There are two steps to execute a script with subprocess.Popen.
  120. # (1) process = subprocess.Popen([comannds])
  121. # (2) stdout, stderr = process.communicate()
  122. # This function creates a mock process which can be returned by a mock
  123. # subprocess.Popen. The mock process returns the given output and error
  124. # when mock_process.communicate() is called.
  125. mock_process = mock.Mock()
  126. attrs = {"communicate.return_value": (output, error), "returncode": 0}
  127. mock_process.configure_mock(**attrs)
  128. return mock_process
  129. @mock.patch("subprocess.Popen", autospec=True)
  130. def test_success(self, mock_popen):
  131. mock_popen.return_value = self.create_mock_process(
  132. pytest.public_cert_bytes + pytest.private_key_bytes, b""
  133. )
  134. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  135. assert cert == pytest.public_cert_bytes
  136. assert key == pytest.private_key_bytes
  137. assert passphrase is None
  138. mock_popen.return_value = self.create_mock_process(
  139. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  140. )
  141. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  142. ["command"], expect_encrypted_key=True
  143. )
  144. assert cert == pytest.public_cert_bytes
  145. assert key == ENCRYPTED_EC_PRIVATE_KEY
  146. assert passphrase == PASSPHRASE_VALUE
  147. @mock.patch("subprocess.Popen", autospec=True)
  148. def test_success_with_cert_chain(self, mock_popen):
  149. PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes
  150. mock_popen.return_value = self.create_mock_process(
  151. PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b""
  152. )
  153. cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"])
  154. assert cert == PUBLIC_CERT_CHAIN_BYTES
  155. assert key == pytest.private_key_bytes
  156. assert passphrase is None
  157. mock_popen.return_value = self.create_mock_process(
  158. PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  159. )
  160. cert, key, passphrase = _mtls_helper._run_cert_provider_command(
  161. ["command"], expect_encrypted_key=True
  162. )
  163. assert cert == PUBLIC_CERT_CHAIN_BYTES
  164. assert key == ENCRYPTED_EC_PRIVATE_KEY
  165. assert passphrase == PASSPHRASE_VALUE
  166. @mock.patch("subprocess.Popen", autospec=True)
  167. def test_missing_cert(self, mock_popen):
  168. mock_popen.return_value = self.create_mock_process(
  169. pytest.private_key_bytes, b""
  170. )
  171. with pytest.raises(exceptions.ClientCertError):
  172. _mtls_helper._run_cert_provider_command(["command"])
  173. mock_popen.return_value = self.create_mock_process(
  174. ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b""
  175. )
  176. with pytest.raises(exceptions.ClientCertError):
  177. _mtls_helper._run_cert_provider_command(
  178. ["command"], expect_encrypted_key=True
  179. )
  180. @mock.patch("subprocess.Popen", autospec=True)
  181. def test_missing_key(self, mock_popen):
  182. mock_popen.return_value = self.create_mock_process(
  183. pytest.public_cert_bytes, b""
  184. )
  185. with pytest.raises(exceptions.ClientCertError):
  186. _mtls_helper._run_cert_provider_command(["command"])
  187. mock_popen.return_value = self.create_mock_process(
  188. pytest.public_cert_bytes + PASSPHRASE, b""
  189. )
  190. with pytest.raises(exceptions.ClientCertError):
  191. _mtls_helper._run_cert_provider_command(
  192. ["command"], expect_encrypted_key=True
  193. )
  194. @mock.patch("subprocess.Popen", autospec=True)
  195. def test_missing_passphrase(self, mock_popen):
  196. mock_popen.return_value = self.create_mock_process(
  197. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  198. )
  199. with pytest.raises(exceptions.ClientCertError):
  200. _mtls_helper._run_cert_provider_command(
  201. ["command"], expect_encrypted_key=True
  202. )
  203. @mock.patch("subprocess.Popen", autospec=True)
  204. def test_passphrase_not_expected(self, mock_popen):
  205. mock_popen.return_value = self.create_mock_process(
  206. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  207. )
  208. with pytest.raises(exceptions.ClientCertError):
  209. _mtls_helper._run_cert_provider_command(["command"])
  210. @mock.patch("subprocess.Popen", autospec=True)
  211. def test_encrypted_key_expected(self, mock_popen):
  212. mock_popen.return_value = self.create_mock_process(
  213. pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b""
  214. )
  215. with pytest.raises(exceptions.ClientCertError):
  216. _mtls_helper._run_cert_provider_command(
  217. ["command"], expect_encrypted_key=True
  218. )
  219. @mock.patch("subprocess.Popen", autospec=True)
  220. def test_unencrypted_key_expected(self, mock_popen):
  221. mock_popen.return_value = self.create_mock_process(
  222. pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b""
  223. )
  224. with pytest.raises(exceptions.ClientCertError):
  225. _mtls_helper._run_cert_provider_command(["command"])
  226. @mock.patch("subprocess.Popen", autospec=True)
  227. def test_cert_provider_returns_error(self, mock_popen):
  228. mock_popen.return_value = self.create_mock_process(b"", b"some error")
  229. mock_popen.return_value.returncode = 1
  230. with pytest.raises(exceptions.ClientCertError):
  231. _mtls_helper._run_cert_provider_command(["command"])
  232. @mock.patch("subprocess.Popen", autospec=True)
  233. def test_popen_raise_exception(self, mock_popen):
  234. mock_popen.side_effect = OSError()
  235. with pytest.raises(exceptions.ClientCertError):
  236. _mtls_helper._run_cert_provider_command(["command"])
  237. class TestGetClientSslCredentials(object):
  238. @mock.patch(
  239. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  240. )
  241. @mock.patch(
  242. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  243. )
  244. @mock.patch(
  245. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  246. )
  247. def test_success(
  248. self,
  249. mock_check_dca_metadata_path,
  250. mock_read_dca_metadata_file,
  251. mock_run_cert_provider_command,
  252. ):
  253. mock_check_dca_metadata_path.return_value = True
  254. mock_read_dca_metadata_file.return_value = {
  255. "cert_provider_command": ["command"]
  256. }
  257. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  258. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  259. assert has_cert
  260. assert cert == b"cert"
  261. assert key == b"key"
  262. assert passphrase is None
  263. @mock.patch(
  264. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  265. )
  266. def test_success_without_metadata(self, mock_check_dca_metadata_path):
  267. mock_check_dca_metadata_path.return_value = False
  268. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
  269. assert not has_cert
  270. assert cert is None
  271. assert key is None
  272. assert passphrase is None
  273. @mock.patch(
  274. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  275. )
  276. @mock.patch(
  277. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  278. )
  279. @mock.patch(
  280. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  281. )
  282. def test_success_with_encrypted_key(
  283. self,
  284. mock_check_dca_metadata_path,
  285. mock_read_dca_metadata_file,
  286. mock_run_cert_provider_command,
  287. ):
  288. mock_check_dca_metadata_path.return_value = True
  289. mock_read_dca_metadata_file.return_value = {
  290. "cert_provider_command": ["command"]
  291. }
  292. mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
  293. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  294. generate_encrypted_key=True
  295. )
  296. assert has_cert
  297. assert cert == b"cert"
  298. assert key == b"key"
  299. assert passphrase == b"passphrase"
  300. mock_run_cert_provider_command.assert_called_once_with(
  301. ["command", "--with_passphrase"], expect_encrypted_key=True
  302. )
  303. @mock.patch(
  304. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  305. )
  306. @mock.patch(
  307. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  308. )
  309. def test_missing_cert_command(
  310. self, mock_check_dca_metadata_path, mock_read_dca_metadata_file
  311. ):
  312. mock_check_dca_metadata_path.return_value = True
  313. mock_read_dca_metadata_file.return_value = {}
  314. with pytest.raises(exceptions.ClientCertError):
  315. _mtls_helper.get_client_ssl_credentials()
  316. @mock.patch(
  317. "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
  318. )
  319. @mock.patch(
  320. "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True
  321. )
  322. @mock.patch(
  323. "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
  324. )
  325. def test_customize_context_aware_metadata_path(
  326. self,
  327. mock_check_dca_metadata_path,
  328. mock_read_dca_metadata_file,
  329. mock_run_cert_provider_command,
  330. ):
  331. context_aware_metadata_path = "/path/to/metata/data"
  332. mock_check_dca_metadata_path.return_value = context_aware_metadata_path
  333. mock_read_dca_metadata_file.return_value = {
  334. "cert_provider_command": ["command"]
  335. }
  336. mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
  337. has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
  338. context_aware_metadata_path=context_aware_metadata_path
  339. )
  340. assert has_cert
  341. assert cert == b"cert"
  342. assert key == b"key"
  343. assert passphrase is None
  344. mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
  345. mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path)
  346. class TestGetClientCertAndKey(object):
  347. def test_callback_success(self):
  348. callback = mock.Mock()
  349. callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)
  350. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback)
  351. assert found_cert_key
  352. assert cert == pytest.public_cert_bytes
  353. assert key == pytest.private_key_bytes
  354. @mock.patch(
  355. "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True
  356. )
  357. def test_use_metadata(self, mock_get_client_ssl_credentials):
  358. mock_get_client_ssl_credentials.return_value = (
  359. True,
  360. pytest.public_cert_bytes,
  361. pytest.private_key_bytes,
  362. None,
  363. )
  364. found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key()
  365. assert found_cert_key
  366. assert cert == pytest.public_cert_bytes
  367. assert key == pytest.private_key_bytes
  368. class TestDecryptPrivateKey(object):
  369. def test_success(self):
  370. decrypted_key = _mtls_helper.decrypt_private_key(
  371. ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE
  372. )
  373. private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key)
  374. public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY)
  375. x509 = crypto.X509()
  376. x509.set_pubkey(public_key)
  377. # Test the decrypted key works by signing and verification.
  378. signature = crypto.sign(private_key, b"data", "sha256")
  379. crypto.verify(x509, signature, b"data", "sha256")
  380. def test_crypto_error(self):
  381. with pytest.raises(crypto.Error):
  382. _mtls_helper.decrypt_private_key(
  383. ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password"
  384. )