123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import io
- import pytest
- from botocore.credentials import Credentials, ReadOnlyCredentials
- from botocore.exceptions import ClientError, NoCredentialsError
- from botocore.session import Session
- from s3transfer.exceptions import TransferNotDoneError
- from s3transfer.utils import CallArgs
- from __tests__ import HAS_CRT, FileCreator, mock, requires_crt, unittest
- if HAS_CRT:
- import awscrt.auth
- import awscrt.s3
- import s3transfer.crt
- requires_crt_pytest = pytest.mark.skipif(
- not HAS_CRT, reason="Test requires awscrt to be installed."
- )
- @pytest.fixture
- def mock_crt_process_lock(monkeypatch):
- # The process lock is cached at the module layer whenever the
- # cross process lock is successfully acquired. This patch ensures that
- # test cases will start off with no previously cached process lock and
- # if a cross process is instantiated/acquired it will be the mock that
- # can be used for controlling lock behavior.
- monkeypatch.setattr('s3transfer.crt.CRT_S3_PROCESS_LOCK', None)
- with mock.patch('awscrt.s3.CrossProcessLock', spec=True) as mock_lock:
- yield mock_lock
- @pytest.fixture
- def mock_s3_crt_client():
- with mock.patch('s3transfer.crt.S3Client', spec=True) as mock_client:
- yield mock_client
- @pytest.fixture
- def mock_get_recommended_throughput_target_gbps():
- with mock.patch(
- 's3transfer.crt.get_recommended_throughput_target_gbps'
- ) as mock_get_target_gbps:
- yield mock_get_target_gbps
- class CustomFutureException(Exception):
- pass
- @requires_crt_pytest
- class TestCRTProcessLock:
- def test_acquire_crt_s3_process_lock(self, mock_crt_process_lock):
- lock = s3transfer.crt.acquire_crt_s3_process_lock('app-name')
- assert lock is s3transfer.crt.CRT_S3_PROCESS_LOCK
- assert lock is mock_crt_process_lock.return_value
- mock_crt_process_lock.assert_called_once_with('app-name')
- mock_crt_process_lock.return_value.acquire.assert_called_once_with()
- def test_unable_to_acquire_lock_returns_none(self, mock_crt_process_lock):
- mock_crt_process_lock.return_value.acquire.side_effect = RuntimeError
- assert s3transfer.crt.acquire_crt_s3_process_lock('app-name') is None
- assert s3transfer.crt.CRT_S3_PROCESS_LOCK is None
- mock_crt_process_lock.assert_called_once_with('app-name')
- mock_crt_process_lock.return_value.acquire.assert_called_once_with()
- def test_multiple_acquires_return_same_lock(self, mock_crt_process_lock):
- lock = s3transfer.crt.acquire_crt_s3_process_lock('app-name')
- assert s3transfer.crt.acquire_crt_s3_process_lock('app-name') is lock
- assert lock is s3transfer.crt.CRT_S3_PROCESS_LOCK
- # The process lock should have only been instantiated and acquired once
- mock_crt_process_lock.assert_called_once_with('app-name')
- mock_crt_process_lock.return_value.acquire.assert_called_once_with()
- @requires_crt
- class TestBotocoreCRTRequestSerializer(unittest.TestCase):
- def setUp(self):
- self.region = 'us-west-2'
- self.session = Session()
- self.session.set_config_variable('region', self.region)
- self.request_serializer = s3transfer.crt.BotocoreCRTRequestSerializer(
- self.session
- )
- self.bucket = "test_bucket"
- self.key = "test_key"
- self.files = FileCreator()
- self.filename = self.files.create_file('myfile', 'my content')
- self.expected_path = "/" + self.bucket + "/" + self.key
- self.expected_host = "s3.%s.amazonaws.com" % (self.region)
- def tearDown(self):
- self.files.remove_all()
- def test_upload_request(self):
- callargs = CallArgs(
- bucket=self.bucket,
- key=self.key,
- fileobj=self.filename,
- extra_args={},
- subscribers=[],
- )
- coordinator = s3transfer.crt.CRTTransferCoordinator()
- future = s3transfer.crt.CRTTransferFuture(
- s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
- )
- crt_request = self.request_serializer.serialize_http_request(
- "put_object", future
- )
- self.assertEqual("PUT", crt_request.method)
- self.assertEqual(self.expected_path, crt_request.path)
- self.assertEqual(self.expected_host, crt_request.headers.get("host"))
- self.assertIsNone(crt_request.headers.get("Authorization"))
- def test_download_request(self):
- callargs = CallArgs(
- bucket=self.bucket,
- key=self.key,
- fileobj=self.filename,
- extra_args={},
- subscribers=[],
- )
- coordinator = s3transfer.crt.CRTTransferCoordinator()
- future = s3transfer.crt.CRTTransferFuture(
- s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
- )
- crt_request = self.request_serializer.serialize_http_request(
- "get_object", future
- )
- self.assertEqual("GET", crt_request.method)
- self.assertEqual(self.expected_path, crt_request.path)
- self.assertEqual(self.expected_host, crt_request.headers.get("host"))
- self.assertIsNone(crt_request.headers.get("Authorization"))
- def test_delete_request(self):
- callargs = CallArgs(
- bucket=self.bucket, key=self.key, extra_args={}, subscribers=[]
- )
- coordinator = s3transfer.crt.CRTTransferCoordinator()
- future = s3transfer.crt.CRTTransferFuture(
- s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
- )
- crt_request = self.request_serializer.serialize_http_request(
- "delete_object", future
- )
- self.assertEqual("DELETE", crt_request.method)
- self.assertEqual(self.expected_path, crt_request.path)
- self.assertEqual(self.expected_host, crt_request.headers.get("host"))
- self.assertIsNone(crt_request.headers.get("Authorization"))
- def _create_crt_response_error(
- self, status_code, body, operation_name=None
- ):
- return awscrt.s3.S3ResponseError(
- code=14343,
- name='AWS_ERROR_S3_INVALID_RESPONSE_STATUS',
- message='Invalid response status from request',
- status_code=status_code,
- headers=[
- ('x-amz-request-id', 'QSJHJJZR2EDYD4GQ'),
- (
- 'x-amz-id-2',
- 'xDbgdKdvYZTjgpOTzm7yNP2JPrOQl+eaQvUkFdOjdJoWkIC643fgHxdsHpUKvVAfjKf5F6otEYA=',
- ),
- ('Content-Type', 'application/xml'),
- ('Transfer-Encoding', 'chunked'),
- ('Date', 'Fri, 10 Nov 2023 23:22:47 GMT'),
- ('Server', 'AmazonS3'),
- ],
- body=body,
- operation_name=operation_name,
- )
- def test_translate_get_object_404(self):
- body = (
- b'<?xml version="1.0" encoding="UTF-8"?>\n<Error>'
- b'<Code>NoSuchKey</Code>'
- b'<Message>The specified key does not exist.</Message>'
- b'<Key>obviously-no-such-key.txt</Key>'
- b'<RequestId>SBJ7ZQY03N1WDW9T</RequestId>'
- b'<HostId>SomeHostId</HostId></Error>'
- )
- crt_exc = self._create_crt_response_error(404, body, 'GetObject')
- boto_err = self.request_serializer.translate_crt_exception(crt_exc)
- self.assertIsInstance(
- boto_err, self.session.create_client('s3').exceptions.NoSuchKey
- )
- def test_translate_head_object_404(self):
- # There's no body in a HEAD response, so we can't map it to a modeled S3 exception.
- # But it should still map to a botocore ClientError
- body = None
- crt_exc = self._create_crt_response_error(
- 404, body, operation_name='HeadObject'
- )
- boto_err = self.request_serializer.translate_crt_exception(crt_exc)
- self.assertIsInstance(boto_err, ClientError)
- def test_translate_unknown_operation_404(self):
- body = None
- crt_exc = self._create_crt_response_error(404, body)
- boto_err = self.request_serializer.translate_crt_exception(crt_exc)
- self.assertIsInstance(boto_err, ClientError)
- @requires_crt_pytest
- class TestBotocoreCRTCredentialsWrapper:
- @pytest.fixture
- def botocore_credentials(self):
- return Credentials(
- access_key='access_key', secret_key='secret_key', token='token'
- )
- def assert_crt_credentials(
- self,
- crt_credentials,
- expected_access_key='access_key',
- expected_secret_key='secret_key',
- expected_token='token',
- ):
- assert crt_credentials.access_key_id == expected_access_key
- assert crt_credentials.secret_access_key == expected_secret_key
- assert crt_credentials.session_token == expected_token
- def test_fetch_crt_credentials_successfully(self, botocore_credentials):
- wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
- botocore_credentials
- )
- crt_credentials = wrapper()
- self.assert_crt_credentials(crt_credentials)
- def test_wrapper_does_not_cache_frozen_credentials(self):
- mock_credentials = mock.Mock(Credentials)
- mock_credentials.get_frozen_credentials.side_effect = [
- ReadOnlyCredentials('access_key_1', 'secret_key_1', 'token_1'),
- ReadOnlyCredentials('access_key_2', 'secret_key_2', 'token_2'),
- ]
- wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
- mock_credentials
- )
- crt_credentials_1 = wrapper()
- self.assert_crt_credentials(
- crt_credentials_1,
- expected_access_key='access_key_1',
- expected_secret_key='secret_key_1',
- expected_token='token_1',
- )
- crt_credentials_2 = wrapper()
- self.assert_crt_credentials(
- crt_credentials_2,
- expected_access_key='access_key_2',
- expected_secret_key='secret_key_2',
- expected_token='token_2',
- )
- assert mock_credentials.get_frozen_credentials.call_count == 2
- def test_raises_error_when_resolved_credentials_is_none(self):
- wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(None)
- with pytest.raises(NoCredentialsError):
- wrapper()
- def test_to_crt_credentials_provider(self, botocore_credentials):
- wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
- botocore_credentials
- )
- crt_credentials_provider = wrapper.to_crt_credentials_provider()
- assert isinstance(
- crt_credentials_provider, awscrt.auth.AwsCredentialsProvider
- )
- get_credentials_future = crt_credentials_provider.get_credentials()
- crt_credentials = get_credentials_future.result()
- self.assert_crt_credentials(crt_credentials)
- @requires_crt
- class TestCRTTransferFuture(unittest.TestCase):
- def setUp(self):
- self.mock_s3_request = mock.Mock(awscrt.s3.S3RequestType)
- self.mock_crt_future = mock.Mock(awscrt.s3.Future)
- self.mock_s3_request.finished_future = self.mock_crt_future
- self.coordinator = s3transfer.crt.CRTTransferCoordinator()
- self.coordinator.set_s3_request(self.mock_s3_request)
- self.future = s3transfer.crt.CRTTransferFuture(
- coordinator=self.coordinator
- )
- def test_set_exception(self):
- self.future.set_exception(CustomFutureException())
- with self.assertRaises(CustomFutureException):
- self.future.result()
- def test_set_exception_raises_error_when_not_done(self):
- self.mock_crt_future.done.return_value = False
- with self.assertRaises(TransferNotDoneError):
- self.future.set_exception(CustomFutureException())
- def test_set_exception_can_override_previous_exception(self):
- self.future.set_exception(Exception())
- self.future.set_exception(CustomFutureException())
- with self.assertRaises(CustomFutureException):
- self.future.result()
- @requires_crt
- class TestOnBodyFileObjWriter(unittest.TestCase):
- def test_call(self):
- fileobj = io.BytesIO()
- writer = s3transfer.crt.OnBodyFileObjWriter(fileobj)
- writer(chunk=b'content')
- self.assertEqual(fileobj.getvalue(), b'content')
- @requires_crt_pytest
- class TestCreateS3CRTClient:
- @pytest.mark.parametrize(
- 'provided_bytes_per_sec,recommended_gbps,expected_gbps',
- [
- (None, 100.0, 100.0),
- (None, None, 10.0),
- # NOTE: create_s3_crt_client() accepts target throughput as bytes
- # per second and it is converted to gigabits per second for the
- # CRT client instantiation.
- (1_000_000_000, None, 8.0),
- (1_000_000_000, 100.0, 8.0),
- ],
- )
- def test_target_throughput(
- self,
- provided_bytes_per_sec,
- recommended_gbps,
- expected_gbps,
- mock_s3_crt_client,
- mock_get_recommended_throughput_target_gbps,
- ):
- mock_get_recommended_throughput_target_gbps.return_value = (
- recommended_gbps
- )
- s3transfer.crt.create_s3_crt_client(
- 'us-west-2',
- target_throughput=provided_bytes_per_sec,
- )
- assert (
- mock_s3_crt_client.call_args[1]['throughput_target_gbps']
- == expected_gbps
- )
|