# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import mock from OpenSSL import crypto import pytest from google.auth import exceptions from google.auth.transport import _mtls_helper import yatest.common DATA_DIR = os.path.join(yatest.common.test_source_path(), "data") CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]} CONTEXT_AWARE_METADATA_NO_CERT_PROVIDER_COMMAND = {} ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY----- MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU -----END ENCRYPTED PRIVATE KEY-----""" EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY----- MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/ brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw== -----END PUBLIC KEY-----""" PASSPHRASE = b"""-----BEGIN PASSPHRASE----- password -----END PASSPHRASE-----""" PASSPHRASE_VALUE = b"password" def check_cert_and_key(content, expected_cert, expected_key): success = True cert_match = re.findall(_mtls_helper._CERT_REGEX, content) success = success and len(cert_match) == 1 and cert_match[0] == expected_cert key_match = re.findall(_mtls_helper._KEY_REGEX, content) success = success and len(key_match) == 1 and key_match[0] == expected_key return success class TestCertAndKeyRegex(object): def test_cert_and_key(self): # Test single cert and single key check_cert_and_key( pytest.public_cert_bytes + pytest.private_key_bytes, pytest.public_cert_bytes, pytest.private_key_bytes, ) check_cert_and_key( pytest.private_key_bytes + pytest.public_cert_bytes, pytest.public_cert_bytes, pytest.private_key_bytes, ) # Test cert chain and single key check_cert_and_key( pytest.public_cert_bytes + pytest.public_cert_bytes + pytest.private_key_bytes, pytest.public_cert_bytes + pytest.public_cert_bytes, pytest.private_key_bytes, ) check_cert_and_key( pytest.private_key_bytes + pytest.public_cert_bytes + pytest.public_cert_bytes, pytest.public_cert_bytes + pytest.public_cert_bytes, pytest.private_key_bytes, ) def test_key(self): # Create some fake keys for regex check. KEY = b"""-----BEGIN PRIVATE KEY----- MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB -----END PRIVATE KEY-----""" RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY----- MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB -----END RSA PRIVATE KEY-----""" EC_KEY = b"""-----BEGIN EC PRIVATE KEY----- MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB -----END EC PRIVATE KEY-----""" check_cert_and_key( pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY ) check_cert_and_key( pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY ) check_cert_and_key( pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY ) class TestCheckaMetadataPath(object): def test_success(self): metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json") returned_path = _mtls_helper._check_dca_metadata_path(metadata_path) assert returned_path is not None def test_failure(self): metadata_path = os.path.join(DATA_DIR, "not_exists.json") returned_path = _mtls_helper._check_dca_metadata_path(metadata_path) assert returned_path is None class TestReadMetadataFile(object): def test_success(self): metadata_path = os.path.join(DATA_DIR, "context_aware_metadata.json") metadata = _mtls_helper._read_dca_metadata_file(metadata_path) assert "cert_provider_command" in metadata def test_file_not_json(self): # read a file which is not json format. metadata_path = os.path.join(DATA_DIR, "privatekey.pem") with pytest.raises(exceptions.ClientCertError): _mtls_helper._read_dca_metadata_file(metadata_path) class TestRunCertProviderCommand(object): def create_mock_process(self, output, error): # There are two steps to execute a script with subprocess.Popen. # (1) process = subprocess.Popen([comannds]) # (2) stdout, stderr = process.communicate() # This function creates a mock process which can be returned by a mock # subprocess.Popen. The mock process returns the given output and error # when mock_process.communicate() is called. mock_process = mock.Mock() attrs = {"communicate.return_value": (output, error), "returncode": 0} mock_process.configure_mock(**attrs) return mock_process @mock.patch("subprocess.Popen", autospec=True) def test_success(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + pytest.private_key_bytes, b"" ) cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"]) assert cert == pytest.public_cert_bytes assert key == pytest.private_key_bytes assert passphrase is None mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" ) cert, key, passphrase = _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) assert cert == pytest.public_cert_bytes assert key == ENCRYPTED_EC_PRIVATE_KEY assert passphrase == PASSPHRASE_VALUE @mock.patch("subprocess.Popen", autospec=True) def test_success_with_cert_chain(self, mock_popen): PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes mock_popen.return_value = self.create_mock_process( PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b"" ) cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"]) assert cert == PUBLIC_CERT_CHAIN_BYTES assert key == pytest.private_key_bytes assert passphrase is None mock_popen.return_value = self.create_mock_process( PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" ) cert, key, passphrase = _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) assert cert == PUBLIC_CERT_CHAIN_BYTES assert key == ENCRYPTED_EC_PRIVATE_KEY assert passphrase == PASSPHRASE_VALUE @mock.patch("subprocess.Popen", autospec=True) def test_missing_cert(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.private_key_bytes, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) mock_popen.return_value = self.create_mock_process( ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) @mock.patch("subprocess.Popen", autospec=True) def test_missing_key(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + PASSPHRASE, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) @mock.patch("subprocess.Popen", autospec=True) def test_missing_passphrase(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) @mock.patch("subprocess.Popen", autospec=True) def test_passphrase_not_expected(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) @mock.patch("subprocess.Popen", autospec=True) def test_encrypted_key_expected(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command( ["command"], expect_encrypted_key=True ) @mock.patch("subprocess.Popen", autospec=True) def test_unencrypted_key_expected(self, mock_popen): mock_popen.return_value = self.create_mock_process( pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b"" ) with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) @mock.patch("subprocess.Popen", autospec=True) def test_cert_provider_returns_error(self, mock_popen): mock_popen.return_value = self.create_mock_process(b"", b"some error") mock_popen.return_value.returncode = 1 with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) @mock.patch("subprocess.Popen", autospec=True) def test_popen_raise_exception(self, mock_popen): mock_popen.side_effect = OSError() with pytest.raises(exceptions.ClientCertError): _mtls_helper._run_cert_provider_command(["command"]) class TestGetClientSslCredentials(object): @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_success( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_run_cert_provider_command, ): mock_check_dca_metadata_path.return_value = True mock_read_dca_metadata_file.return_value = { "cert_provider_command": ["command"] } mock_run_cert_provider_command.return_value = (b"cert", b"key", None) has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() assert has_cert assert cert == b"cert" assert key == b"key" assert passphrase is None @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_success_without_metadata(self, mock_check_dca_metadata_path): mock_check_dca_metadata_path.return_value = False has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() assert not has_cert assert cert is None assert key is None assert passphrase is None @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_success_with_encrypted_key( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_run_cert_provider_command, ): mock_check_dca_metadata_path.return_value = True mock_read_dca_metadata_file.return_value = { "cert_provider_command": ["command"] } mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase") has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( generate_encrypted_key=True ) assert has_cert assert cert == b"cert" assert key == b"key" assert passphrase == b"passphrase" mock_run_cert_provider_command.assert_called_once_with( ["command", "--with_passphrase"], expect_encrypted_key=True ) @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_missing_cert_command( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file ): mock_check_dca_metadata_path.return_value = True mock_read_dca_metadata_file.return_value = {} with pytest.raises(exceptions.ClientCertError): _mtls_helper.get_client_ssl_credentials() @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True ) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_customize_context_aware_metadata_path( self, mock_check_dca_metadata_path, mock_read_dca_metadata_file, mock_run_cert_provider_command, ): context_aware_metadata_path = "/path/to/metata/data" mock_check_dca_metadata_path.return_value = context_aware_metadata_path mock_read_dca_metadata_file.return_value = { "cert_provider_command": ["command"] } mock_run_cert_provider_command.return_value = (b"cert", b"key", None) has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( context_aware_metadata_path=context_aware_metadata_path ) assert has_cert assert cert == b"cert" assert key == b"key" assert passphrase is None mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path) mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path) class TestGetClientCertAndKey(object): def test_callback_success(self): callback = mock.Mock() callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes) found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback) assert found_cert_key assert cert == pytest.public_cert_bytes assert key == pytest.private_key_bytes @mock.patch( "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True ) def test_use_metadata(self, mock_get_client_ssl_credentials): mock_get_client_ssl_credentials.return_value = ( True, pytest.public_cert_bytes, pytest.private_key_bytes, None, ) found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key() assert found_cert_key assert cert == pytest.public_cert_bytes assert key == pytest.private_key_bytes class TestDecryptPrivateKey(object): def test_success(self): decrypted_key = _mtls_helper.decrypt_private_key( ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE ) private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key) public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY) x509 = crypto.X509() x509.set_pubkey(public_key) # Test the decrypted key works by signing and verification. signature = crypto.sign(private_key, b"data", "sha256") crypto.verify(x509, signature, b"data", "sha256") def test_crypto_error(self): with pytest.raises(crypto.Error): _mtls_helper.decrypt_private_key( ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password" )